In [2]:
import boto3 #  Python SDK for AWS
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [3]:
# Creating variables for the flexibility of the code
arg_date = '2021-05-07'
src_format = '%Y-%m-%d'
src_bucket = 'deutsche-boerse-xetra-pds'
trg_bucket = 'xetra-bucket-etl'
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
key = 'xetra_daily_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'

In [4]:
arg_date_dt = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1) # Parses a string representing a time according to a format | timedelta =  It is one of the easiest ways to perform date manipulations.

In [5]:
arg_date_dt

datetime.date(2021, 5, 6)

In [None]:
s3 = boto3.resource('s3') # high-level services class wrap around boto3.client (Resource definition for S3)
bucket = s3.Bucket(src_bucket)
objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], '%Y-%m-%d').date() >= arg_date_dt] # Filters the objects

In [12]:
def csv_to_df(filename):
    csv_obj = bucket.Object(key=filename).get().get('Body').read().decode('utf-8') # Create a csv object
    data = StringIO(csv_obj) #StringIO is an in-memory file-like object that does do alterations to newlines
    df = pd.read_csv(data, delimiter=',')
    return df
df_all = pd.concat([csv_to_df(obj.key) for obj in objects], ignore_index=True)

In [13]:
objects # All objects who got choosed

[s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR00.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR01.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR02.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR03.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR04.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR05.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR06.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR07.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR08.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pd

In [15]:
df_all

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume
0,AT0000A0E9W5,2021-05-06,07:00,21.52,21.90,21.52,21.82,2648
1,DE000A0DJ6J9,2021-05-06,07:00,46.02,46.02,45.82,45.84,1766
2,DE000A0D6554,2021-05-06,07:00,21.56,21.78,21.56,21.78,14935
3,DE000A0D9PT0,2021-05-06,07:00,203.60,203.60,202.50,202.50,1246
4,DE000A0HN5C6,2021-05-06,07:00,44.18,44.21,44.04,44.08,12002
...,...,...,...,...,...,...,...,...
10470629,DE0005565204,2021-10-08,15:42,36.56,36.56,36.56,36.56,973
10470630,DE0007164600,2021-10-08,15:42,117.44,117.44,117.44,117.44,300
10470631,DE0005408116,2021-10-08,15:44,28.30,28.30,28.30,28.30,500
10470632,NL0012169213,2021-10-08,15:44,44.05,44.05,44.05,44.05,40


In [16]:
df_all = df_all.loc[:, columns]

In [17]:
df_all

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume
0,AT0000A0E9W5,2021-05-06,07:00,21.52,21.90,21.52,21.82,2648
1,DE000A0DJ6J9,2021-05-06,07:00,46.02,46.02,45.82,45.84,1766
2,DE000A0D6554,2021-05-06,07:00,21.56,21.78,21.56,21.78,14935
3,DE000A0D9PT0,2021-05-06,07:00,203.60,203.60,202.50,202.50,1246
4,DE000A0HN5C6,2021-05-06,07:00,44.18,44.21,44.04,44.08,12002
...,...,...,...,...,...,...,...,...
10470629,DE0005565204,2021-10-08,15:42,36.56,36.56,36.56,36.56,973
10470630,DE0007164600,2021-10-08,15:42,117.44,117.44,117.44,117.44,300
10470631,DE0005408116,2021-10-08,15:44,28.30,28.30,28.30,28.30,500
10470632,NL0012169213,2021-10-08,15:44,44.05,44.05,44.05,44.05,40


In [18]:
df_all.dropna(inplace=True) # dropna() function is used to remove rows and columns with Null/NaN values.

In [19]:
df_all.shape

(10470634, 8)

## Get opening price per ISIN and day

In [20]:
df_all['opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first') # Grouping by ISIN and Date

In [21]:
df_all[df_all['ISIN']=='AT0000A0E9W5'] # Only allows ISIN = AT0000A0E9W5

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price
0,AT0000A0E9W5,2021-05-06,07:00,21.52,21.90,21.52,21.82,2648,21.52
151,AT0000A0E9W5,2021-05-06,07:01,21.90,21.90,21.78,21.78,373,21.52
273,AT0000A0E9W5,2021-05-06,07:02,21.76,22.04,21.76,22.04,9534,21.52
673,AT0000A0E9W5,2021-05-06,07:04,22.00,22.04,22.00,22.00,8376,21.52
2845,AT0000A0E9W5,2021-05-06,07:05,22.00,22.06,22.00,22.04,2002,21.52
...,...,...,...,...,...,...,...,...,...
10466407,AT0000A0E9W5,2021-10-08,15:26,21.20,21.20,21.20,21.20,100,20.78
10466634,AT0000A0E9W5,2021-10-08,15:27,21.20,21.20,21.20,21.20,100,20.78
10466865,AT0000A0E9W5,2021-10-08,15:28,21.22,21.22,21.22,21.22,144,20.78
10467113,AT0000A0E9W5,2021-10-08,15:29,21.22,21.22,21.22,21.22,5,20.78


## Get closing price per ISIN and day

In [22]:
df_all['closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')

In [23]:
df_all[df_all['ISIN']=='AT0000A0E9W5']

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price,closing_price
0,AT0000A0E9W5,2021-05-06,07:00,21.52,21.90,21.52,21.82,2648,21.52,21.50
151,AT0000A0E9W5,2021-05-06,07:01,21.90,21.90,21.78,21.78,373,21.52,21.50
273,AT0000A0E9W5,2021-05-06,07:02,21.76,22.04,21.76,22.04,9534,21.52,21.50
673,AT0000A0E9W5,2021-05-06,07:04,22.00,22.04,22.00,22.00,8376,21.52,21.50
2845,AT0000A0E9W5,2021-05-06,07:05,22.00,22.06,22.00,22.04,2002,21.52,21.50
...,...,...,...,...,...,...,...,...,...,...
10466407,AT0000A0E9W5,2021-10-08,15:26,21.20,21.20,21.20,21.20,100,20.78,21.16
10466634,AT0000A0E9W5,2021-10-08,15:27,21.20,21.20,21.20,21.20,100,20.78,21.16
10466865,AT0000A0E9W5,2021-10-08,15:28,21.22,21.22,21.22,21.22,144,20.78,21.16
10467113,AT0000A0E9W5,2021-10-08,15:29,21.22,21.22,21.22,21.22,5,20.78,21.16


## Aggregations

In [24]:
df_all = df_all.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), closing_price_eur=('closing_price', 'min'), minimum_price_eur=('MinPrice', 'min'), maximum_price_eur=('MaxPrice', 'max'), daily_traded_volume=('TradedVolume', 'sum'))

In [25]:
df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume
0,AT00000FACC2,2021-05-06,8.880,8.730,8.730,8.880,943
1,AT00000FACC2,2021-05-07,8.860,9.000,8.840,9.000,547
2,AT00000FACC2,2021-05-10,9.100,9.020,8.940,9.100,145
3,AT00000FACC2,2021-05-11,8.850,8.820,8.820,8.850,304
4,AT00000FACC2,2021-05-12,8.820,8.860,8.820,8.860,12
...,...,...,...,...,...,...,...
333180,XS2314660700,2021-10-04,15.469,15.303,15.303,15.713,12
333181,XS2314660700,2021-10-05,15.286,15.510,15.286,15.510,0
333182,XS2314660700,2021-10-06,15.503,15.547,15.503,15.780,0
333183,XS2314660700,2021-10-07,15.510,15.716,15.510,15.783,0


## Percent Change Prev Closing

In [26]:
df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1) # Shift index by desired number of periods with an optional time freq. 

In [27]:
df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,prev_closing_price
0,AT00000FACC2,2021-05-06,8.880,8.730,8.730,8.880,943,
1,AT00000FACC2,2021-05-07,8.860,9.000,8.840,9.000,547,8.730
2,AT00000FACC2,2021-05-10,9.100,9.020,8.940,9.100,145,9.000
3,AT00000FACC2,2021-05-11,8.850,8.820,8.820,8.850,304,9.020
4,AT00000FACC2,2021-05-12,8.820,8.860,8.820,8.860,12,8.820
...,...,...,...,...,...,...,...,...
333180,XS2314660700,2021-10-04,15.469,15.303,15.303,15.713,12,15.355
333181,XS2314660700,2021-10-05,15.286,15.510,15.286,15.510,0,15.303
333182,XS2314660700,2021-10-06,15.503,15.547,15.503,15.780,0,15.510
333183,XS2314660700,2021-10-07,15.510,15.716,15.510,15.783,0,15.547


In [28]:
df_all['change_prev_closing_%'] = (df_all['closing_price_eur'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] * 100

In [29]:
df_all.drop(columns=['prev_closing_price'], inplace=True)

In [30]:
df_all = df_all.round(decimals=2)

In [31]:
df_all.reset_index(inplace=True)

In [32]:
df_all = df_all[df_all.Date >= arg_date]

In [33]:
df_all

Unnamed: 0,index,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
1,1,AT00000FACC2,2021-05-07,8.86,9.00,8.84,9.00,547,3.09
2,2,AT00000FACC2,2021-05-10,9.10,9.02,8.94,9.10,145,0.22
3,3,AT00000FACC2,2021-05-11,8.85,8.82,8.82,8.85,304,-2.22
4,4,AT00000FACC2,2021-05-12,8.82,8.86,8.82,8.86,12,0.45
5,5,AT00000FACC2,2021-05-13,8.84,8.79,8.79,8.95,1885,-0.79
...,...,...,...,...,...,...,...,...,...
333180,333180,XS2314660700,2021-10-04,15.47,15.30,15.30,15.71,12,-0.34
333181,333181,XS2314660700,2021-10-05,15.29,15.51,15.29,15.51,0,1.35
333182,333182,XS2314660700,2021-10-06,15.50,15.55,15.50,15.78,0,0.24
333183,333183,XS2314660700,2021-10-07,15.51,15.72,15.51,15.78,0,1.09


## Write to S3

In [34]:
out_buffer = BytesIO() # BytesIO  can be useful when you need to pass data to or from an API that expect to be given a file object, but where you'd prefer to pass the data directly. 
df_all.to_parquet(out_buffer, index=False) # The to_parquet() function is used to write a DataFrame to the binary parquet format.
bucket_target = s3.Bucket(trg_bucket)
bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)# Adds an object to a bucket.

s3.Object(bucket_name='xetra-bucket-etl', key='xetra_daily_report_20211009_175017.parquet')

## Read uploaded file

In [35]:
for obj in bucket_target.objects.all():
    print(obj.key)

xetra_daily_report_20211007_170456.parquet
xetra_daily_report_20211007_181631.parquet
xetra_daily_report_20211008_025459.parquet
xetra_daily_report_20211009_175017.parquet


In [36]:
prq_obj = bucket_target.Object(key='xetra_daily_report_20211007_170456.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

In [37]:
df_report

Unnamed: 0,index,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,1,AT00000FACC2,2021-05-07,8.86,9.00,8.84,9.00,547,3.09
1,2,AT00000FACC2,2021-05-10,9.10,9.02,8.94,9.10,145,0.22
2,3,AT00000FACC2,2021-05-11,8.85,8.82,8.82,8.85,304,-2.22
3,4,AT00000FACC2,2021-05-12,8.82,8.86,8.82,8.86,12,0.45
4,5,AT00000FACC2,2021-05-13,8.84,8.79,8.79,8.95,1885,-0.79
...,...,...,...,...,...,...,...,...,...
326872,329851,XS2314660700,2021-10-01,15.42,15.36,15.36,15.42,0,-0.56
326873,329852,XS2314660700,2021-10-04,15.47,15.30,15.30,15.71,12,-0.34
326874,329853,XS2314660700,2021-10-05,15.29,15.51,15.29,15.51,0,1.35
326875,329854,XS2314660700,2021-10-06,15.50,15.55,15.50,15.78,0,0.24
