In [41]:
import boto3
import pandas as pd
from io import StringIO,BytesIO # to read csv using s3 storage
from datetime import datetime, timedelta 
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

In [42]:
arg_date = '2022-12-28'
suorce_date_format = '%Y-%m-%d'
duet_xetra_source = 'xetra-1234'
target_bucket = 'xetra-destination'
keys = 'parquet/xetra_daily_report_'+datetime.today().strftime('%y%m%d_%H%M%S')+'.parquet'
cols_needed = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice','EndPrice', 'TradedVolume', 'NumberOfTrades']
az_des_con_str = 'DefaultEndpointsProtocol=https;AccountName=xetradestination;AccountKey=jHuTzDSICJbvjv/YKkQescXmPJg8Gntu/U1/P36fXqssoTAjPj7LCyo/lj1tTcjeSBaoY2IzFyKe+AStvRyTtQ==;EndpointSuffix=core.windows.net'

In [43]:
arg_date_dt = datetime.strptime(arg_date,suorce_date_format ).date() - timedelta(days=1) 

In [44]:
arg_date_dt

datetime.date(2022, 12, 27)

In [45]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(duet_xetra_source) 
objects = [obj for obj in bucket.objects.all() if datetime.strptime(str(obj.key).split('/')[0], '%Y-%m-%d').date() >= arg_date_dt]

## CREATING THE Pandas table 

In [46]:
def csv_to_pandas(filename):
    csv_obj = bucket.Object(key=filename).get().get('Body').read().decode('utf-8')
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=',')
    return df
df_all = pd.DataFrame()
df_all = pd.concat([csv_to_pandas(obj.key) for obj in objects],ignore_index=True)


## Extracting the Needed Column to calculate the outputs

In [47]:
df_all = df_all.loc[:,cols_needed]

In [48]:
df_all.dropna(inplace=True)

## Get opening price for ISIN per date.

In [49]:
df_all['opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')

## Closing Price For ISIN per date.

In [50]:
df_all['closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('last')

In [51]:
df_all = df_all.groupby(['ISIN','Date'], as_index=True)

In [52]:
df_all = df_all.agg(
    opening_price_euro=('opening_price','min'), 
    closing_price_euro=('closing_price','min'), 
    minimum_price_euro=('MinPrice','min'), 
    maxmimum_price_euro=('MaxPrice','max'), 
    daily_traded_volume=('TradedVolume','sum') )

# Prev - Pres Closing Price

In [53]:
df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_euro'].shift(1)

In [54]:
df_all['change_prev_closing_precent'] = (df_all['closing_price_euro'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] * 100

In [55]:
df_all.drop(columns=['closing_price_euro'], inplace=True)

In [56]:
df_all = df_all.round(decimals=2)

## AWS Write to Bucket xetra-destination

In [57]:
output_buffer = BytesIO()

In [58]:
df_all.to_parquet(output_buffer, index=False)

In [59]:
bucket_target = s3.Bucket(target_bucket)

In [60]:
bucket_target.put_object(Body=output_buffer.getvalue(),Key=keys)

s3.Object(bucket_name='xetra-destination', key='parquet/xetra_daily_report_230101_161249.parquet')

## Read the uploaded file from AWS

In [62]:
df_report = []
for obj in bucket_target.objects.all():
    print(obj.key)
    parquet_obj = bucket_target.Object(key=obj.key).get().get('Body').read()
    data = BytesIO(parquet_obj)
    df_report.append({obj.key:pd.read_parquet(data)})

parquet/xetra_daily_report_221230_134318.parquet
parquet/xetra_daily_report_230101_151136.parquet
parquet/xetra_daily_report_230101_161249.parquet
