In [1]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [2]:
arg_date = '2021-05-07'
src_format = '%Y-%m-%d'
src_bucket = 'deutsche-boerse-xetra-pds'
trg_bucket = 'parsed-xetra-1'
columns = ['ISIN','Date','Time','StartPrice','MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
key = 'xetra_daily_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'

In [None]:
arg_date_dt= datetime.strptime(arg_date, src_format).date() - timedelta(days = 1)

In [None]:
arg_date_dt

In [4]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(src_bucket)

In [None]:
objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], src_format).date() >= arg_date_dt]

In [None]:
objects

In [None]:
csv_object_init = bucket.Object(key=objects[0].key).get().get('Body').read().decode('utf-8')
data = StringIO(csv_object_init)
df_init = pd.read_csv(data, delimiter=',')

In [None]:
df_init.columns

In [None]:
def csv_to_df(filename):
    csv_object = bucket.Object(key=filename).get().get('Body').read().decode('utf-8')
    data = StringIO(csv_object)
    df = pd.read_csv(data, delimiter=',')
    return df

df_all = df.concat([csv_to_df(obj.key) for obj in objects], ignore_index = True)

In [None]:
df_all

In [None]:
df_all = df_all.loc[:,columns]

In [None]:
df_all.dropna(inplace = True)

In [None]:
df_all.shape

# Get opening price and closing price per ISIN and day

In [None]:
df_all['Opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')


In [None]:
df_all['Closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['EndPrice'].transform('first')

In [None]:
df_all

# Aggregations

In [None]:
df_all = df_all.groupby(['ISIN','Date'], as_index=False).agg(opening_price_eur=('Opening_price','min'),closing_price_eur=('Closing_price','min'), minimum_price_eur=('MinPrice','min'),maximum_price_eur=('MaxPrice','max'), daily_traded_volume = ('TradedVolume','sum'))

In [None]:
df_all

# Percent Change Prev Closing

In [None]:
df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)

In [None]:
df_all

In [None]:
df_all['change_prev_closing_%'] = (df_all['closing_price_eur'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] *100 

In [None]:
df_all

In [None]:
df_all.drop(columns = ['prev_closing_price'], inplace= True)

In [None]:
df_all = df_all.round(decimals=2)

In [None]:
df_all = df_all[df_all.Date >= arg_date]

In [None]:
df_all

# Write to S3


In [None]:
out_buffer = BytesIO()
df_all.to_parquet(out_buffer, index=False)
bucket_target = s3.Bucket(trg_bucket)
bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)

# Reading the uploaded file

In [5]:
bucket = s3.Bucket(src_bucket)
bucket_target = s3.Bucket(trg_bucket)

In [6]:
for obj in bucket_target.objects.all():
    print(obj.key)

xetra_daily_report_20210803_220637.parquet


In [7]:
prq_obj = bucket_target.Object(key='xetra_daily_report_20210803_220637.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

In [8]:
df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT00000FACC2,2021-05-07,8.86,8.86,8.84,9.00,547,-0.23
1,AT00000FACC2,2021-05-10,9.10,9.10,8.94,9.10,145,2.71
2,AT00000FACC2,2021-05-11,8.85,8.85,8.82,8.85,304,-2.75
3,AT00000FACC2,2021-05-12,8.82,8.82,8.82,8.86,12,-0.34
4,AT00000FACC2,2021-05-13,8.84,8.84,8.79,8.95,1885,0.23
...,...,...,...,...,...,...,...,...
183570,XS2284324667,2021-07-28,29.88,29.88,29.86,30.10,4364,-0.07
183571,XS2284324667,2021-07-29,30.13,30.13,30.04,30.40,3266,0.83
183572,XS2284324667,2021-07-30,30.34,30.34,30.34,30.41,5659,0.70
183573,XS2284324667,2021-08-02,30.40,30.40,29.89,30.42,1221,0.20
