In [157]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [158]:
# Adapter Layer
def read_csv_to_df(bucket, key, decoding='utf-8', sep=','):
#     print("key: " + key)
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df=pd.read_csv(data, delimiter=sep)
    return df 

def write_df_to_s3_parquet(bucket, df, key): 
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

def write_df_to_s3_csv(bucket, df, key): 
    out_buffer = StringIO()
    df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files

In [159]:
# Application layer
def return_date_list(bucket, arg_date, src_format, meta_key):
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    today = datetime.strptime(today_str, src_format).date()
    try:
        df_meta = read_csv_to_df(bucket_trg, meta_key)
        return_date_list = [(min_date+timedelta(days=x)) for x in range(0, (today-min_date).days +1)]
        # src_dates = df_meta['source_date'].tolist()
        # s_dates=[datetime.strptime(x, src_format).date() for x in src_dates]
        # ss_dates = set(s_dates)
        src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)
        process_dates = set(return_date_list[1:])
        if process_dates-sr_dates:
            mindate=min(process_dates-sr_dates)-timedelta(days=1)
            return_dates = [date.strftime(src_format) for date in return_date_list if date>=min_date]
            return_min_date=arg_date
        else:
            return_dates = []
            mindate=datetime(3300, 1, 1).date()
    except bucket.session.client('s3').execptions.NoSuchKey:
        return_dates = [(min_date + timedelta(day=x)).strftime(src_format) for x in range(0, (today-min_date).days + 1)]
        return_min_date = arg_date
    return return_min_date, return_dates
        
def update_meta_file(bucket, meta_key, extract_date_list, src_format):
    df_new = pd.DataFrame(columns=['source_date', 'datetime_of_processing'])
    df_new['source_date'] = extract_date_list
    df_new['datetime_of_processing']=datetime.today().strftime('%Y-%m-%d %H:%M:%S')
    df_old = read_csv_to_df(bucket, meta_key)
    df_all = pd.concat([df_old, df_new])
    write_df_to_s3_csv(bucket, df_all, meta_key)
    
def extract(bucket, date_list):
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    df=pd.concat([read_csv_to_df(bucket, file) for file in files], ignore_index=True)
    return df

def transform_report1(df, columns, arg_date):
    df=df.loc[:, columns]
    df.dropna(inplace=True)
    df['opening_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')
    df['closing_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
    df = df.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), closing_price_eur=('closing_price', 'min'), minimum_price_eur=('MinPrice', 'min'), maximum_price_eur=('MaxPrice', 'max'), daily_traded_volume=('TradedVolume', 'sum'))
    df['prev_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    df['change_prev_closing_%']=(df['closing_price_eur']-df['prev_closing_price'])/df['prev_closing_price']*100    
    df.drop(columns=['prev_closing_price'], inplace=True)
    df=df.round(decimals=2)
    df=df[df.Date >= arg_date]
    return df

def load(bucket, df, trg_key, trg_format, meta_key, extract_date_list):
    key = trg_key + datetime.today().strftime("%Y%m%d_%H%M%S") + trg_format
    write_df_to_s3_parquet(bucket, df, key)
    update_meta_file(bucket, meta_key, extract_date_list, src_format)
    return True

def return_date_list(bucket, arg_date, src_format):
    min_date = datetime.strptime(arg_date, src_format).date()-timedelta(days=1)
    today=datetime.today().date()
    return_date_list=[(min_date + timedelta(days=x)).strftime(src_format) for x in range(0, (today-min_date).days + 1)]
#     print(return_date_list)
    return return_date_list

def etl_report1(bucket_src, bucket_trg, date_list, columns, arg_date, trg_key, trg_format):
    df=extract(bucket_src, date_list)
    df=transform_report1(df, columns, arg_date)
    extract_date_list = [date for date in date_list if date > arg_date]
    load(bucket_trg, df, trg_key, trg_format, meta_key, extract_date_list)
    return True

In [160]:
# main function entrypoint
def main():
    # Parameters/Configurations
    # Later read config
    arg_date ='2021-12-31'
    today_str='2021-12-31'
    src_format = '%Y-%m-%d'
    src_bucket_name = 'deutsche-boerse-xetra-pds'
    trg_bucket_name = 'xetra-edp'
    columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
    trg_key = 'xetra_daily_report_'
    trg_format = '.parquet'
    meta_key='meta_file.csv'
    
    # Init
    s3 = boto3.resource('s3')
    bucket_src = s3.Bucket(src_bucket_name)
    bucket_trg = s3.Bucket(trg_bucket_name)
    
    # run application
    date_list = return_date_list(bucket_src, arg_date, src_format)
    etl_report1(bucket_src, bucket_trg, date_list, columns, arg_date, trg_key, trg_format)

In [161]:
main()

['2021-12-30', '2021-12-31', '2022-01-01']
key: 2021-12-30/2021-12-30_BINS_XETR00.csv
key: 2021-12-30/2021-12-30_BINS_XETR01.csv
key: 2021-12-30/2021-12-30_BINS_XETR02.csv
key: 2021-12-30/2021-12-30_BINS_XETR03.csv
key: 2021-12-30/2021-12-30_BINS_XETR04.csv
key: 2021-12-30/2021-12-30_BINS_XETR05.csv
key: 2021-12-30/2021-12-30_BINS_XETR06.csv
key: 2021-12-30/2021-12-30_BINS_XETR07.csv
key: 2021-12-30/2021-12-30_BINS_XETR08.csv
key: 2021-12-30/2021-12-30_BINS_XETR09.csv
key: 2021-12-30/2021-12-30_BINS_XETR10.csv
key: 2021-12-30/2021-12-30_BINS_XETR11.csv
key: 2021-12-30/2021-12-30_BINS_XETR12.csv
key: 2021-12-30/2021-12-30_BINS_XETR13.csv
key: 2021-12-30/2021-12-30_BINS_XETR14.csv
key: 2021-12-30/2021-12-30_BINS_XETR15.csv
key: 2021-12-30/2021-12-30_BINS_XETR16.csv
key: 2021-12-30/2021-12-30_BINS_XETR17.csv
key: 2021-12-30/2021-12-30_BINS_XETR18.csv
key: 2021-12-30/2021-12-30_BINS_XETR19.csv
key: 2021-12-30/2021-12-30_BINS_XETR20.csv
key: 2021-12-30/2021-12-30_BINS_XETR21.csv
key: 2021-1

## Write to S3

## Reading the uploaded file

In [163]:
for obj in bucket_trg.objects.all():
    print(obj.key)

meta_file.csv
xetra_daily_report_20211230_083546.parquet
xetra_daily_report_20211231_100259.parquet
xetra_daily_report_20211231_135742.parquet
xetra_daily_report_20220101_112413.parquet


In [165]:
prq_obj = bucket_trg.Object(key='xetra_daily_report_20211230_083546.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report=pd.read_parquet(data)

In [166]:
df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT000000STR1,2021-12-29,36.20,36.60,36.20,36.70,592,0.00
1,AT000000STR1,2021-12-30,36.85,36.75,36.40,36.85,135,0.41
2,AT00000FACC2,2021-12-29,7.35,7.28,7.21,7.36,1479,-1.75
3,AT0000606306,2021-12-29,25.56,25.90,25.56,25.90,926,2.13
4,AT0000606306,2021-12-30,26.00,25.70,25.70,26.00,188,-0.77
...,...,...,...,...,...,...,...,...
6133,XS2314659447,2021-12-30,8.53,8.53,8.53,8.53,0,0.79
6134,XS2314660700,2021-12-29,17.81,17.77,17.73,17.81,69,-0.74
6135,XS2314660700,2021-12-30,18.04,17.99,17.99,18.04,0,1.27
6136,XS2376095068,2021-12-29,42.16,42.00,41.62,42.16,0,-1.74
