In [264]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [265]:
# Adapter layer

def read_csv_to_df(bucket, key, decoding = 'utf-8', sep=','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

def write_df_s3(bucket, df, key, file_format):
    if file_format == '.parquet':
        out_buffer=BytesIO()
        df.to_parquet(out_buffer, index=False)
    elif file_format == '.csv':
        out_buffer=StringIO()
        df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True
        
def list_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix =prefix)]
    return files

# def return_objects(bucket, arg_date, src_format):
#     arg_date_dt = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
#     arg_date_dt_end = datetime.strptime(arg_date, src_format).date() + timedelta(days=10)
    
#     objects = [obj.key for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], src_format).date()> arg_date_dt and
#                datetime.strptime(obj.key.split('/')[0], src_format).date()< arg_date_dt_end]
#     return objects


In [272]:
# Application layer

def extract(bucket, key, date_list):
    files = [key for date in date_list for key in list_in_prefix(bucket, date)]
    df = pd.concat([read_csv_to_df(bucket, obj) for obj in files if not read_csv_to_df(bucket, obj).empty], ignore_index=True)
    return df

def transform_report1(data, arg_date):
    data.dropna(inplace=True)
    data['opening_price']=round(data.sort_values(['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first'),2)
    data['closing_price'] = round(data.sort_values('Time', ascending=False).groupby(['ISIN','Date'])['EndPrice'].transform('first'),2)
    data_agg = data.groupby(['ISIN', 'Date'], as_index=False).agg(openning_price_euro=('opening_price','min'),
                     closing_price_euro=('closing_price','min'), min_price_euro=('MinPrice', 'min'), max_price_euro=('MaxPrice', 'max'), 
                                                  traded_volume_euro=('TradedVolume','sum'))
    data_agg['percentage_change'] = data_agg.sort_values('Date').groupby('ISIN')['closing_price_euro'].shift(1)
    data_agg['percentage_change_cal'] = (data_agg['closing_price_euro']-data_agg['percentage_change'])/data_agg['closing_price_euro']
    data_agg[data_agg['Date']>arg_date]
    return data_agg    
    
def load(bucket, df, target_key, file_format, meta_key, extract_date_list):
    key = target_key + datetime.today().strftime('%Y%m%D %H%M%S')+file_format
    write_df_s3(bucket, df, key, '.parquet')
    update_meta_file(target_bucket, meta_key, extract_date_list, src_format)
    return True

def etl_report1(source_bucket, target_bucket, date_list, arg_date, target_key, file_format, meta_key):
    df = extract(source_bucket, key, date_list)
    df = transform_report1(df, arg_date)
    extract_date_list = [date for date in date_list if date>= arg_date]
    load(target_bucket, df, target_key, file_format, meta_key, extract_date_list)
    return True

In [282]:
# Application Layer - not core

def return_date_list(bucket, target_bucket, arg_date, src_format, meta_key):
    arg_date_dt = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    arg_date_dt_end = datetime.strptime(arg_date, src_format).date() + timedelta(days=10)
    date_list = [datetime.strftime(arg_date_dt+timedelta(days=x), src_format) for x in range(0, ((arg_date_dt_end-arg_date_dt).days))]
    try:
        
        meta_data = read_csv_to_df(target_bucket, meta_key, decoding = 'utf-8', sep=',')
        meta_data_date = list(pd.to_datetime(meta_data['source_date']).dt.date)
        date_list_date = [datetime.strptime(date, src_format).date() for date in date_list]
        not_processed = [date for date in date_list_date if date not in meta_data_date]
        min_date = min(not_processed)
        
        date_list = [datetime.strftime(date, src_format) for date in date_list_date if date>=min_date]
        return_min_date = min(date_list)
        
    except:
         return_min_date = arg_date
    return return_min_date, date_list

def update_meta_file(target_bucket, meta_key, extract_date_list, src_format):
    meta_data = read_csv_to_df(target_bucket, meta_key, decoding = 'utf-8', sep=',')
    added_date = pd.DataFrame({'source_date': extract_date_list, 'processed_date': datetime.today().strftime(src_format)})
    meta_data = pd.concat([meta_data, added_date], ignore_index=True)
    write_df_s3(target_bucket, meta_data, meta_key, '.csv')
    
    

In [283]:
# manin function

def main():
    # Parameters/Configurations
    arg_date = '2022-02-16'
    src_format = '%Y-%m-%d'
    src_bucket = 'xetra-1234'
    trg_bucket = 'mybucket-1213'
    target_key = 'xetra_daily_repo'
    file_format = '.parquet'
    meta_key = 'meta_data.csv'


    # Init
    s3 = boto3.resource('s3')
    source_bucket = s3.Bucket(src_bucket)
    target_bucket = s3.Bucket(trg_bucket)

    # Run application
    extract_date, date_list = return_date_list(source_bucket, target_bucket, arg_date, src_format, meta_key)
    etl_report1(source_bucket, target_bucket, date_list, extract_date, target_key, file_format, meta_key)

In [284]:
# run
main()

In [147]:
date_list

['2022-02-15',
 '2022-02-16',
 '2022-02-17',
 '2022-02-18',
 '2022-02-19',
 '2022-02-20',
 '2022-02-21',
 '2022-02-22',
 '2022-02-23',
 '2022-02-24',
 '2022-02-25']

In [200]:
key='meta_data.csv'
trg_bucket = 'mybucket-1213'
bucket_target = s3.Bucket(trg_bucket)

meta_data = read_csv_to_df(bucket_target, key, decoding = 'utf-8', sep=',')
meta_data

Unnamed: 0,source_date,processed_date
0,2022-02-15,2022-02-15
1,2022-02-16,2022-02-16
2,2022-02-17,2022-02-18
3,2022-02-19,2022-02-19
4,2022-02-20,2022-02-20
5,2022-02-23,2022-02-23


In [204]:
meta_data_date = list(pd.to_datetime(meta_data['source_date']).dt.date)
meta_data_date

[datetime.date(2022, 2, 15),
 datetime.date(2022, 2, 16),
 datetime.date(2022, 2, 17),
 datetime.date(2022, 2, 19),
 datetime.date(2022, 2, 20),
 datetime.date(2022, 2, 23)]

In [205]:
date_list_date = [datetime.strptime(date, src_format).date() for date in date_list]
date_list_date

[datetime.date(2022, 2, 15),
 datetime.date(2022, 2, 16),
 datetime.date(2022, 2, 17),
 datetime.date(2022, 2, 18),
 datetime.date(2022, 2, 19),
 datetime.date(2022, 2, 20),
 datetime.date(2022, 2, 21),
 datetime.date(2022, 2, 22),
 datetime.date(2022, 2, 23),
 datetime.date(2022, 2, 24),
 datetime.date(2022, 2, 25)]

In [217]:
not_processed = [date for date in date_list_date if date not in meta_data_date]
not_processed

[datetime.date(2022, 2, 18),
 datetime.date(2022, 2, 21),
 datetime.date(2022, 2, 22),
 datetime.date(2022, 2, 24),
 datetime.date(2022, 2, 25)]

In [219]:
min_date = min(not_processed)
if not_processed:
    return_dates = [datetime.strftime(date, src_format) for date in date_list_date if date>=min_date]
    return_min_date = min(return_dates)
else:
    return_dates = []
    return_min_date = datetime(2200,1,1).date()
    

In [220]:
return_min_date

'2022-02-18'

# Reading the uploaded file

In [252]:
s3 = boto3.resource('s3')
target_bucket = s3.Bucket(trg_bucket)
for obj in target_bucket.objects.all():
    print(obj.key)

meta_data.csv
xetra_daily_repo20240303/31/24 014001.parquet
xetra_daily_repo20240303/31/24 014446.parquet
xetra_daily_repo20240303/31/24 014856.parquet
xetra_daily_repo20240404/01/24 145503.parquet
xetra_daily_repo20240404/01/24 145802.parquet
xetra_daily_repo20240404/01/24 153112.parquet
xetra_daily_repo20240404/01/24 175153.parquet
xetra_daily_repo20240404/01/24 175447.parquet
xetra_daily_repo20240404/01/24 180643.parquet
xetra_daily_repo20240404/01/24 180751.parquet
xetra_daily_repo20240404/01/24 181112.parquet
xetra_daily_repo20240404/01/24 181319.parquet
xetra_daily_repo20240404/01/24 181612.parquet
xetra_daily_report20240303/29/24 002359.parquet
xetra_daily_report20240303/29/24 130343.parquet
xetra_daily_report20240303/29/24 130519.parquet
xetra_daily_report20240303/29/24 130717.parquet


In [262]:
parq_obj=bucket_target.Object(key='xetra_daily_repo20240404/01/24 181612.parquet').get().get('Body').read()
data_=BytesIO(parq_obj)
df_report=pd.read_parquet(data_)
df_report.sort_values('Date')

Unnamed: 0,ISIN,Date,openning_price_euro,closing_price_euro,min_price_euro,max_price_euro,traded_volume_euro,percentage_change,percentage_change_cal
0,AT000000STR1,2022-02-18,38.85,38.60,38.6000,38.8500,153,,
11441,IE00BP2C1V62,2022-02-18,20.59,20.42,20.4150,20.6000,0,,
11435,IE00BP2C1S34,2022-02-18,27.22,26.92,26.9200,27.2200,0,,
11429,IE00BP2C0316,2022-02-18,19.23,19.07,19.0600,19.2620,11,,
3204,DE000A1N49P6,2022-02-18,35.52,35.91,34.8940,36.1520,53965,,
...,...,...,...,...,...,...,...,...,...
10683,IE00BLS09N40,2022-02-25,5.11,5.50,4.7400,5.5300,37624,4.97,0.096364
3503,DE000A2BDEC4,2022-02-25,155.08,153.07,152.1000,155.0800,496,160.69,-0.049781
16331,NL0011683594,2022-02-25,32.38,32.87,32.1050,32.8800,12129,32.13,0.022513
1238,DE0006070006,2022-02-25,61.96,63.04,61.3000,64.0400,264780,61.42,0.025698


In [2]:
arg_date = '2022-01-27'
src_format = '%Y-%m-%d'
src_bucket = 'xetra-1234'
trg_bucket = 'mybucket-1213'
target_key = 'xetra_daily_report'
file_format = '.parquet'
# key = 'xetra_daily_report'+datetime.today().strftime('%Y%m%D %H%M%S')+'.parquet'

In [44]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(src_bucket)

arg_date_dt = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
arg_date_dt_end = datetime.strptime(arg_date, src_format).date() + timedelta(days=10)

objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], src_format).date()> arg_date_dt and datetime.strptime(obj.key.split('/')[0], src_format).date()< arg_date_dt_end]


In [47]:
objects = [obj for obj in bucket.objects.all()]

In [58]:
datetime.strptime(objects[0].key.split('/')[0], src_format).date()

datetime.date(2022, 1, 3)

In [4]:
def extract(objects):
    my_data=pd.DataFrame()
    for i in range(len(objects)):
        csv_obj = bucket.Object(key=objects[i].key).get().get('Body').read().decode('utf-8')
        data = StringIO(csv_obj)
        df = pd.read_csv(data, delimiter=',')
        if not df.empty:  
            if not my_data.empty:
                my_data = pd.concat([my_data, df], ignore_index=True)
            else:
                my_data = df.copy()
    return my_data[['ISIN','Mnemonic','Date','Time','StartPrice','MaxPrice','MinPrice','EndPrice','TradedVolume']]

In [5]:
data = extract(objects)

In [6]:
data.shape

(852007, 9)

# Get Opening Price

In [7]:
data['opening_price']=round(data.sort_values(['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first'),2)

# Get Closing Price

In [8]:
data['closing_price'] = round(data.sort_values('Time', ascending=False).groupby(['ISIN','Date'])['EndPrice'].transform('first'),2)

In [9]:
data[data['ISIN']=='AT0000A0E9W5']

Unnamed: 0,ISIN,Mnemonic,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price,closing_price
0,AT0000A0E9W5,SANT,2022-01-27,08:00,16.35,16.40,16.35,16.38,7258,16.35,16.54
74,AT0000A0E9W5,SANT,2022-01-27,08:01,16.46,16.46,16.46,16.46,86,16.35,16.54
139,AT0000A0E9W5,SANT,2022-01-27,08:02,16.41,16.41,16.35,16.35,731,16.35,16.54
365,AT0000A0E9W5,SANT,2022-01-27,08:03,16.30,16.30,16.30,16.30,746,16.35,16.54
532,AT0000A0E9W5,SANT,2022-01-27,08:04,16.35,16.36,16.34,16.34,375,16.35,16.54
...,...,...,...,...,...,...,...,...,...,...,...
847384,AT0000A0E9W5,SANT,2022-02-04,16:26,16.62,16.62,16.62,16.62,11,16.66,16.61
847678,AT0000A0E9W5,SANT,2022-02-04,16:27,16.66,16.66,16.66,16.66,969,16.66,16.61
847977,AT0000A0E9W5,SANT,2022-02-04,16:28,16.64,16.64,16.60,16.63,12536,16.66,16.61
848286,AT0000A0E9W5,SANT,2022-02-04,16:29,16.57,16.63,16.57,16.57,1326,16.66,16.61


# Aggregations

In [10]:
data_agg = data.groupby(['ISIN', 'Date'], as_index=False).agg(openning_price_euro=('opening_price','min'), closing_price_euro=('closing_price','min'), 
                                                  min_price_euro=('MinPrice', 'min'), max_price_euro=('MaxPrice', 'max'), 
                                                  traded_volume_euro=('TradedVolume','sum'))

In [11]:
data_agg['percentage_change'] = data_agg.sort_values('Date').groupby('ISIN')['closing_price_euro'].shift(1)

In [12]:
data_agg['percentage_change_cal'] = (data_agg['closing_price_euro']-data_agg['percentage_change'])/data_agg['closing_price_euro']

In [13]:
data_agg[data_agg['Date']>arg_date][:20]

Unnamed: 0,ISIN,Date,openning_price_euro,closing_price_euro,min_price_euro,max_price_euro,traded_volume_euro,percentage_change,percentage_change_cal
1,AT000000STR1,2022-01-28,38.05,37.0,37.0,38.05,456,37.0,0.0
2,AT000000STR1,2022-01-31,37.8,37.65,37.65,37.8,1492,37.0,0.017264
3,AT000000STR1,2022-02-01,38.1,38.5,38.1,38.6,668,37.65,0.022078
4,AT000000STR1,2022-02-02,38.85,38.6,38.55,38.95,2597,38.5,0.002591
5,AT000000STR1,2022-02-03,39.0,38.6,38.6,39.0,66,38.6,0.0
6,AT000000STR1,2022-02-04,38.85,38.9,38.8,38.9,250,38.6,0.007712
8,AT00000FACC2,2022-01-28,7.66,7.52,7.52,7.66,610,7.62,-0.013298
9,AT00000FACC2,2022-02-01,7.82,8.0,7.75,8.12,1830,7.52,0.06
10,AT00000FACC2,2022-02-02,7.85,7.85,7.85,7.85,0,8.0,-0.019108
11,AT00000FACC2,2022-02-03,8.19,8.57,8.19,8.57,8013,7.85,0.084014


In [14]:
data_agg

Unnamed: 0,ISIN,Date,openning_price_euro,closing_price_euro,min_price_euro,max_price_euro,traded_volume_euro,percentage_change,percentage_change_cal
0,AT000000STR1,2022-01-27,37.60,37.00,37.000,37.900,485,,
1,AT000000STR1,2022-01-28,38.05,37.00,37.000,38.050,456,37.00,0.000000
2,AT000000STR1,2022-01-31,37.80,37.65,37.650,37.800,1492,37.00,0.017264
3,AT000000STR1,2022-02-01,38.10,38.50,38.100,38.600,668,37.65,0.022078
4,AT000000STR1,2022-02-02,38.85,38.60,38.550,38.950,2597,38.50,0.002591
...,...,...,...,...,...,...,...,...,...
22352,XS2376095068,2022-01-31,33.26,33.86,33.126,33.856,0,33.23,0.018606
22353,XS2376095068,2022-02-01,34.08,34.66,34.084,34.674,3920,33.86,0.023081
22354,XS2376095068,2022-02-02,33.97,32.88,32.882,34.098,600,34.66,-0.054136
22355,XS2376095068,2022-02-03,32.75,32.20,32.202,32.748,0,32.88,-0.021118


# Persisting the data

In [15]:
out_buffer=BytesIO()
data_agg.to_parquet(out_buffer, index=False)
bucket_target = s3.Bucket(trg_bucket)
bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)

s3.Object(bucket_name='mybucket-1213', key='xetra_daily_report20240303/29/24 130717.parquet')

# Reading the uploaded file