In [29]:
# import all libraries
import boto3 as bt
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [30]:
# Set arg_date to get records from arg_date to today's date, Format = YYYY-MM-DD
arg_date = '2022-01-10'
arg_date_dt = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1)
# Initialize source and target values
src_date_format = '%Y-%m-%d'
src_bucket_name = 'deutsche-boerse-xetra-pds'
trg_bucket_name = 'dev-xetra-1'

In [31]:
# Connect to AWS s3 bucket
aws_session = bt.Session(
      region_name = 'us-east-1', 
      aws_access_key_id= 'AKIAT464GZVTJY3VEDBN', 
      aws_secret_access_key= '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')
s3 = aws_session.resource('s3')
bucket = s3.Bucket(src_bucket_name)

In [32]:
# Test to check the access data
# bucket_obj1 = bucket.objects.filter(Prefix='2021-03-15')
# bucket_obj2 = bucket.objects.filter(Prefix='2021-03-16')
# objects = [obj for obj in bucket_obj1] + [obj for obj in bucket_obj2]
objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], src_date_format).date() >= arg_date_dt]

In [33]:
# Get schema of incoming data
csv_obj_init = bucket.Object(key=objects[0].key).get().get("Body").read().decode('utf-8')
data_init = StringIO(csv_obj_init)
df_init = pd.read_csv(data_init, delimiter=',')
df_init.columns

Index(['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency',
       'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
       'EndPrice', 'TradedVolume', 'NumberOfTrades'],
      dtype='object')

In [34]:
# Function to convert CSV to Parquet file
def csv_to_df(filename):
    csv_obj = bucket.Object(key=filename).get().get("Body").read().decode('utf-8')
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=',')
    return df

# Set schema to the dataframe
df_all = pd.DataFrame(columns=df_init.columns)
# 
df_all = pd.concat([csv_to_df(obj.key) for obj in objects], ignore_index=True)

In [35]:
# Filter selected columns, drop null rows 
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
df_all = df_all.loc[:,columns]
df_all.dropna(inplace=True)
df_all.shape

(573268, 8)

In [36]:
# Create a column named opening_price which tells the starting price of a stock(ISIN) each day
df_all['opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')

In [37]:
# Create a column named closing_price which tells the closing price of a stock(ISIN) each day
df_all['closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')

In [38]:
# Aggegations: Get min and max, total values for a stock each day
df_agg = (df_all.groupby(['ISIN', 'Date'], as_index=False)
                .agg(opening_price_eur = ('opening_price','min'), closing_price_eur = ('closing_price','min'),
                    minimum_price_eur = ('MinPrice','min'), maximum_price_eur = ('MaxPrice','max'),
                    daily_traded_volume = ('TradedVolume', 'sum')))

In [39]:
# Percent change between closing price of 2 consecutive days
df_agg['prev_closing_price'] = df_agg.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
df_agg['change_prev_closing_%'] = (df_agg['closing_price_eur'] - df_agg['prev_closing_price']) / df_agg['prev_closing_price'] * 100

In [40]:
# Save report to S3
key = 'dev_xetra_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'
bucket_target = s3.Bucket(trg_bucket_name)
out_buffer = BytesIO()
df_agg.to_parquet(out_buffer, index=False)
bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)

s3.Object(bucket_name='dev-xetra-1', key='dev_xetra_report_20220116_113957.parquet')

In [41]:
# Reading uploaded file in s3 bucket
for obj in bucket_target.objects.all():
    print(obj.key)

parquet_obj = bucket_target.Object(key='dev_xetra_report_20220116_104126.parquet').get().get("Body").read()
tar_data = BytesIO(parquet_obj)
df_report = pd.read_parquet(tar_data)

dev_xetra_report_20220116_104126.parquet
dev_xetra_report_20220116_113957.parquet


In [42]:
df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,prev_closing_price,change_prev_closing_%
0,AT000000STR1,2022-01-10,37.400,36.600,36.500,37.400,64,,
1,AT000000STR1,2022-01-11,37.000,37.300,36.600,37.300,140,36.600,1.912568
2,AT000000STR1,2022-01-12,37.000,37.500,37.000,37.500,30,37.300,0.536193
3,AT000000STR1,2022-01-13,37.700,37.700,37.700,37.700,0,37.500,0.533333
4,AT000000STR1,2022-01-14,37.550,37.600,37.550,37.600,175,37.700,-0.265252
...,...,...,...,...,...,...,...,...,...
15990,XS2376095068,2022-01-10,37.072,36.768,36.436,37.072,2564,,
15991,XS2376095068,2022-01-11,37.254,37.448,36.908,37.448,0,36.768,1.849434
15992,XS2376095068,2022-01-12,37.456,38.128,37.456,38.510,5216,37.448,1.815851
15993,XS2376095068,2022-01-13,38.068,37.740,37.740,38.166,0,38.128,-1.017625
