In [2]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
import os, glob
from datetime import datetime, timedelta

In [3]:
def read_csv_to_df(bucket, key, encoding = 'utf-8', sep = ','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(encoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

meta_key = 'meta_file.csv'
bucket_name_trg = 'xetra-1234'
s3 = boto3.resource('s3')
bucket_trg = s3.Bucket(bucket_name_trg)

df_meta = read_csv_to_df(bucket_trg, meta_key)

In [4]:
df_meta

Unnamed: 0,source_date,datetime_of_processing
0,2021-04-23,2021-04-23 12:33:23
1,2021-04-21,2021-04-21 12:30:21


In [59]:
arg_date = '2021-04-22'
today_str = '2021-04-25'
start = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1)
today = datetime.strptime(today_str, '%Y-%m-%d').date()
dates = [start + timedelta(days=x) for x in range(0, (today - start).days + 1)]

In [60]:
src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)

In [61]:
src_dates

{datetime.date(2021, 4, 21), datetime.date(2021, 4, 23)}

In [62]:
dates_missing = set(dates[1:]) - src_dates
if dates_missing:
    min_date = min(set(dates[1:]) - src_dates) - timedelta(days=1)
    return_dates = [date.strftime('%Y-%m-%d') for date in dates if date >= min_date]
    return_min_date = (min_date + timedelta(days=1)).strftime('%Y-%m-%d')
else:
    return_dates = []
    return_min_date = datetime(2200, 1, 1).date()

In [63]:
return_dates

['2021-04-21', '2021-04-22', '2021-04-23', '2021-04-24', '2021-04-25']

In [64]:
return_min_date

'2021-04-22'

In [35]:
# Adapter Layer

def read_csv_to_df(bucket, key, encoding = 'utf-8', sep = ','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(encoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

def write_df_to_s3(bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    
def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files

In [36]:
# Application layer
# Core

def extract(bucket, date_list):
    
    def csv_to_df_nb(key):
        print(key)
        df = read_csv_to_df(bucket, key)
        return df
    
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    df = pd.concat(map(csv_to_df_nb, files), ignore_index=True)
    return df

def transform_report1(df, columns, arg_date):
    df = df.loc[:, columns]
    df.dropna(inplace=True)
    df['opening_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')
    df['closing_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
    df = df.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), closing_price_eur=('closing_price', 'min'), minimum_price_eur=('MinPrice', 'min'), maximum_price_eur=('MaxPrice', 'max'), daily_traded_volume=('TradedVolume', 'sum'))
    df['prev_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    df['change_prev_closing_%'] = (df['closing_price_eur'] - df['prev_closing_price']) / df['prev_closing_price'] * 100
    df.drop(columns=['prev_closing_price'], inplace=True)
    df = df.round(decimals=2)
    df = df[df.Date >= arg_date]
    df.reset_index(inplace=True, drop=True)
    return df

def load(bucket, df, trg_key, trg_format):
    target_key = trg_key + datetime.today().strftime("%Y%m%d_%H%M%S") + trg_format
    write_df_to_s3(bucket, df, target_key)
    return True

def etl_report1(src_bucket, trg_bucket, date_list, trg_key, trg_format, columns, arg_date):
    df = extract(src_bucket, date_list)
    df = transform_report1(df, columns, arg_date)
    load(trg_bucket, df, trg_key, trg_format)

In [37]:
# Application Layer - not core

def return_date_list(bucket, arg_date):
    start = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1)
    today = datetime.today().date()
    return_date_list = [(start + timedelta(days=x)).strftime('%Y-%m-%d') for x in range(0, (today - start).days + 1)]
    return return_date_list

In [38]:
# main function entrypoint

def main():
    
    # Parameters/Configurations
    # later read config
    arg_date = '2021-04-23'
    columns = ['ISIN', 'Mnemonic', 'Date', 'Time', 'StartPrice', 'EndPrice', 'MinPrice', 'MaxPrice', 'TradedVolume']
    trg_key = 'xetra_daily_report_'
    trg_format = '.parquet'
    bucket_name_src = 'deutsche-boerse-xetra-pds'
    bucket_name_trg = 'xetra-1234'
    
    # Init
    s3 = boto3.resource('s3')
    bucket_src = s3.Bucket(bucket_name_src)
    bucket_trg = s3.Bucket(bucket_name_trg)
    
    # run application
    date_list = return_date_list(bucket_src, arg_date)
    etl_report1(bucket_src, bucket_trg, date_list, trg_key, trg_format, columns, arg_date)

In [39]:
# run

main()

2021-04-22/2021-04-22_BINS_XETR00.csv
2021-04-22/2021-04-22_BINS_XETR01.csv
2021-04-22/2021-04-22_BINS_XETR02.csv
2021-04-22/2021-04-22_BINS_XETR03.csv
2021-04-22/2021-04-22_BINS_XETR04.csv
2021-04-22/2021-04-22_BINS_XETR05.csv
2021-04-22/2021-04-22_BINS_XETR06.csv
2021-04-22/2021-04-22_BINS_XETR07.csv
2021-04-22/2021-04-22_BINS_XETR08.csv
2021-04-22/2021-04-22_BINS_XETR09.csv
2021-04-22/2021-04-22_BINS_XETR10.csv
2021-04-22/2021-04-22_BINS_XETR11.csv
2021-04-22/2021-04-22_BINS_XETR12.csv
2021-04-22/2021-04-22_BINS_XETR13.csv
2021-04-22/2021-04-22_BINS_XETR14.csv
2021-04-22/2021-04-22_BINS_XETR15.csv
2021-04-22/2021-04-22_BINS_XETR16.csv
2021-04-22/2021-04-22_BINS_XETR17.csv
2021-04-22/2021-04-22_BINS_XETR18.csv
2021-04-22/2021-04-22_BINS_XETR19.csv
2021-04-22/2021-04-22_BINS_XETR20.csv
2021-04-22/2021-04-22_BINS_XETR21.csv
2021-04-22/2021-04-22_BINS_XETR22.csv
2021-04-22/2021-04-22_BINS_XETR23.csv
2021-04-23/2021-04-23_BINS_XETR0000.csv
2021-04-23/2021-04-23_BINS_XETR0001.csv
2021-04-

2021-04-23/2021-04-23_BINS_XETR0305.csv
2021-04-23/2021-04-23_BINS_XETR0306.csv
2021-04-23/2021-04-23_BINS_XETR0307.csv
2021-04-23/2021-04-23_BINS_XETR0308.csv
2021-04-23/2021-04-23_BINS_XETR0309.csv
2021-04-23/2021-04-23_BINS_XETR0310.csv
2021-04-23/2021-04-23_BINS_XETR0311.csv
2021-04-23/2021-04-23_BINS_XETR0312.csv
2021-04-23/2021-04-23_BINS_XETR0313.csv
2021-04-23/2021-04-23_BINS_XETR0314.csv
2021-04-23/2021-04-23_BINS_XETR0315.csv
2021-04-23/2021-04-23_BINS_XETR0316.csv
2021-04-23/2021-04-23_BINS_XETR0317.csv
2021-04-23/2021-04-23_BINS_XETR0318.csv
2021-04-23/2021-04-23_BINS_XETR0319.csv
2021-04-23/2021-04-23_BINS_XETR0320.csv
2021-04-23/2021-04-23_BINS_XETR0321.csv
2021-04-23/2021-04-23_BINS_XETR0322.csv
2021-04-23/2021-04-23_BINS_XETR0323.csv
2021-04-23/2021-04-23_BINS_XETR0324.csv
2021-04-23/2021-04-23_BINS_XETR0325.csv
2021-04-23/2021-04-23_BINS_XETR0326.csv
2021-04-23/2021-04-23_BINS_XETR0327.csv
2021-04-23/2021-04-23_BINS_XETR0328.csv
2021-04-23/2021-04-23_BINS_XETR0329.csv


2021-04-23/2021-04-23_BINS_XETR0632.csv
2021-04-23/2021-04-23_BINS_XETR0633.csv
2021-04-23/2021-04-23_BINS_XETR0634.csv
2021-04-23/2021-04-23_BINS_XETR0635.csv
2021-04-23/2021-04-23_BINS_XETR0636.csv
2021-04-23/2021-04-23_BINS_XETR0637.csv
2021-04-23/2021-04-23_BINS_XETR0638.csv
2021-04-23/2021-04-23_BINS_XETR0639.csv
2021-04-23/2021-04-23_BINS_XETR0640.csv
2021-04-23/2021-04-23_BINS_XETR0641.csv
2021-04-23/2021-04-23_BINS_XETR0642.csv
2021-04-23/2021-04-23_BINS_XETR0643.csv
2021-04-23/2021-04-23_BINS_XETR0644.csv
2021-04-23/2021-04-23_BINS_XETR0645.csv
2021-04-23/2021-04-23_BINS_XETR0646.csv
2021-04-23/2021-04-23_BINS_XETR0647.csv
2021-04-23/2021-04-23_BINS_XETR0648.csv
2021-04-23/2021-04-23_BINS_XETR0649.csv
2021-04-23/2021-04-23_BINS_XETR0650.csv
2021-04-23/2021-04-23_BINS_XETR0651.csv
2021-04-23/2021-04-23_BINS_XETR0652.csv
2021-04-23/2021-04-23_BINS_XETR0653.csv
2021-04-23/2021-04-23_BINS_XETR0654.csv
2021-04-23/2021-04-23_BINS_XETR0655.csv
2021-04-23/2021-04-23_BINS_XETR0656.csv


2021-04-23/2021-04-23_BINS_XETR0959.csv
2021-04-23/2021-04-23_BINS_XETR1000.csv
2021-04-23/2021-04-23_BINS_XETR1001.csv
2021-04-23/2021-04-23_BINS_XETR1002.csv
2021-04-23/2021-04-23_BINS_XETR1003.csv
2021-04-23/2021-04-23_BINS_XETR1004.csv
2021-04-23/2021-04-23_BINS_XETR1005.csv
2021-04-23/2021-04-23_BINS_XETR1006.csv
2021-04-23/2021-04-23_BINS_XETR1007.csv
2021-04-23/2021-04-23_BINS_XETR1008.csv
2021-04-23/2021-04-23_BINS_XETR1009.csv
2021-04-23/2021-04-23_BINS_XETR1010.csv
2021-04-23/2021-04-23_BINS_XETR1011.csv
2021-04-23/2021-04-23_BINS_XETR1012.csv
2021-04-23/2021-04-23_BINS_XETR1013.csv
2021-04-23/2021-04-23_BINS_XETR1014.csv
2021-04-23/2021-04-23_BINS_XETR1015.csv
2021-04-23/2021-04-23_BINS_XETR1016.csv
2021-04-23/2021-04-23_BINS_XETR1017.csv
2021-04-23/2021-04-23_BINS_XETR1018.csv
2021-04-23/2021-04-23_BINS_XETR1019.csv
2021-04-23/2021-04-23_BINS_XETR1020.csv
2021-04-23/2021-04-23_BINS_XETR1021.csv
2021-04-23/2021-04-23_BINS_XETR1022.csv
2021-04-23/2021-04-23_BINS_XETR1023.csv


2021-04-23/2021-04-23_BINS_XETR1324.csv
2021-04-23/2021-04-23_BINS_XETR1325.csv
2021-04-23/2021-04-23_BINS_XETR1326.csv
2021-04-23/2021-04-23_BINS_XETR1327.csv
2021-04-23/2021-04-23_BINS_XETR1328.csv
2021-04-23/2021-04-23_BINS_XETR1329.csv
2021-04-23/2021-04-23_BINS_XETR1330.csv
2021-04-23/2021-04-23_BINS_XETR1331.csv
2021-04-23/2021-04-23_BINS_XETR1332.csv
2021-04-23/2021-04-23_BINS_XETR1333.csv
2021-04-23/2021-04-23_BINS_XETR1334.csv
2021-04-23/2021-04-23_BINS_XETR1335.csv
2021-04-23/2021-04-23_BINS_XETR1336.csv
2021-04-23/2021-04-23_BINS_XETR1337.csv
2021-04-23/2021-04-23_BINS_XETR1338.csv
2021-04-23/2021-04-23_BINS_XETR1339.csv
2021-04-23/2021-04-23_BINS_XETR1340.csv
2021-04-23/2021-04-23_BINS_XETR1341.csv
2021-04-23/2021-04-23_BINS_XETR1342.csv
2021-04-23/2021-04-23_BINS_XETR1343.csv
2021-04-23/2021-04-23_BINS_XETR1344.csv
2021-04-23/2021-04-23_BINS_XETR1345.csv
2021-04-23/2021-04-23_BINS_XETR1346.csv
2021-04-23/2021-04-23_BINS_XETR1347.csv
2021-04-23/2021-04-23_BINS_XETR1348.csv


## Reading the uploaded file

In [40]:
bucket_name_trg = 'xetra-1234'
s3 = boto3.resource('s3')
bucket_target = s3.Bucket(bucket_name_trg)

for obj in bucket_target.objects.all():
    print(obj.key)

meta/
meta/report1/
meta/report1/xetra_report1_meta_file.csv
report1/xetra_daily_report1_ 20210416_100753. parquet
report1/xetra_daily_report1_20210410_093203.parquet
report1/xetra_daily_report1_20210410_130037.parquet
report1/xetra_daily_report1_20210410_130144.parquet
report1/xetra_daily_report1_20210410_130359.parquet
report1/xetra_daily_report1_20210414_080521.parquet
report1/xetra_daily_report1_20210421_092536.parquet
test.csv
xetra_daily_report_20210423_143033.parquet
xetra_daily_report_20210423_144024.parquet
xetra_daily_report_20210423_161441.parquet


In [41]:
prq_obj_init = bucket_target.Object(key='xetra_daily_report_20210423_161441.parquet').get().get('Body').read()
data = BytesIO(prq_obj_init)
df_prq = pd.read_parquet(data)

In [42]:
df_prq

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT00000FACC2,2021-04-23,9.00,9.00,9.00,9.00,30,-1.64
1,AT0000606306,2021-04-23,17.44,17.78,17.44,17.78,1163,0.11
2,AT0000644505,2021-04-23,110.60,109.20,109.20,110.60,71,-0.73
3,AT0000652011,2021-04-23,28.20,28.20,28.20,28.21,314,-0.67
4,AT0000720008,2021-04-23,6.94,6.94,6.94,6.94,20,-0.29
...,...,...,...,...,...,...,...,...
2639,XS2265368097,2021-04-23,14.89,14.87,14.87,14.89,0,-0.11
2640,XS2265369574,2021-04-23,21.69,21.66,21.66,21.69,0,-0.69
2641,XS2265369731,2021-04-23,10.09,10.24,10.09,10.31,1550,1.54
2642,XS2265370234,2021-04-23,23.71,23.81,23.56,24.21,4200,0.82
