In [56]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta
import pyarrow

In [57]:
# Adapter Layer

def read_csv_to_df(bucket, objects):
    #Se lee el primer objeto para obtener el nombre de las columnas
    csv_obj_init = bucket.Object(key=objects[0].key).get().get('Body').read().decode('utf-8') 
    data = StringIO(csv_obj_init)
    df_init = pd.read_csv(data, delimiter=',')

    df_all = pd.DataFrame(columns=df_init.columns)

    for obj in objects:
        csv_obj = bucket.Object(key=obj.key).get().get('Body').read().decode('utf-8')
        data = StringIO(csv_obj)
        df = pd.read_csv(data, delimiter=',')
        df_all = pd.concat([df,df_all], ignore_index=True)
    
    print('SE HA LEIDO EL ARCHIVO CSV CORRECTAMENTE')
    return df_all

def write_df_to_s3(df_all, key, bucket_target):
    
    #Se inicializa el buffer de salida, en este caso se utiliza BytesIO para que el buffer sea de bytes
    out_buffer = BytesIO()
    df_all.to_parquet(out_buffer, index=False)
    bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)

    print('SE HA CARGADO EL ARCHIVO PARQUET CORRECTAMENTE')
    pass

def return_objects(bucket, arg_date_dt):
    
    #Get all the objects according to the condition given and return them
    objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split("/")[0], '%Y-%m-%d').date() >= arg_date_dt]
    
    print('SE HAN ENCONTRADO LOS OBJETOS CORRECTAMENTE')
    return objects

In [58]:
# Application Layer

def extract(bucket, args):
    #se obtienen los objetos que cumplen con la condicion
    objects = return_objects(bucket, args)
    
    #Se lee el archivo csv y se transforma en un dataframe
    df_all = read_csv_to_df(bucket, objects)
    
    print('SE HAN EXTRAIDO LOS DATOS CORRECTAMENTE')
    return df_all

def transform_report(df_all, arg_date):
    
    #Se transforma el dataframe para obtener el reporte diario
    df_all.dropna(inplace=True)

    df_all['opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')
    df_all['closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['EndPrice'].transform('last')
    df_all = df_all.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), closing_price_eur=('closing_price', 'min'), minimum_price_eur=('MinPrice', 'min'), maximum_price_eur=('MaxPrice', 'max'), daily_traded_volume=('TradedVolume', 'sum'))
    df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    df_all['change_prev_closing_%'] = (df_all['closing_price_eur'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] * 100
    df_all.drop(columns=['prev_closing_price'], inplace=True)
    df_all = df_all.round(decimals=2)
    df_all.reset_index(inplace=True)
    df_all = df_all[df_all.Date >= arg_date] 
    
    print('SE HA TRANSFORMADO EL DATAFRAME')
    return df_all

def load(df_all, bucket_target):
    #Se genera un identificador unico para el archivo
    key = 'xetra_daily_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'
    
    #Se escribe el dataframe en el bucket de destino dentro de AWS S3
    write_df_to_s3(df_all, key, bucket_target)
    pass

def etl_report(bucket_target):

    ultimo_objeto = None
    objetos = list(bucket_target.objects.all())
    cantidad_objetos = len(objetos)
    i = 0
    for obj in bucket_target.objects.all():
       i += 1
       if i == cantidad_objetos:
           ultimo_objeto = obj
           #Identificador del ultimo archivo encontrado dentro del bucket
           print(ultimo_objeto.key)

           #Se obtiene el ultimo objeto del bucket de destino
           prq_obj = bucket_target.Object(ultimo_objeto.key).get().get('Body').read()
           data = BytesIO(prq_obj)
           df_report = pd.read_parquet(data)
           break
       
    print('SE HA LEIDO EL ARCHIVO PARQUET DESTINO CORRECTAMENTE')
    return df_report
    

In [59]:
#Funcion Main
def main():
    #Configuración de la conexión con AWS
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('xetra-1234')
    
    #Se asigna el bucket de destino
    bucket_target = s3.Bucket('xetra-ajlj')

    #Se asigna la fecha de inicio
    arg_date = '2022-12-31'
    arg_date_dt = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1)
    
    # ETL
    
    df_all = extract(bucket, arg_date_dt)
    df_transformed = transform_report(df_all, arg_date)
    load(df_transformed, bucket_target)
    report = etl_report(bucket_target)
    print(report)
    

In [60]:
# run

main()

SE HAN ENCONTRADO LOS OBJETOS CORRECTAMENTE
SE HA LEIDO EL ARCHIVO CSV CORRECTAMENTE
SE HAN EXTRAIDO LOS DATOS CORRECTAMENTE
SE HA TRANSFORMADO EL DATAFRAME
SE HA CARGADO EL ARCHIVO PARQUET CORRECTAMENTE
xetra_daily_report_20230225_194612.parquet
SE HA LEIDO EL ARCHIVO PARQUET DESTINO CORRECTAMENTE
      index          ISIN        Date  opening_price_eur  closing_price_eur  \
0         1  AT000000STR1  2022-12-31              36.60              36.70   
1         3  AT00000FACC2  2022-12-31               8.05               8.57   
2         5  AT0000606306  2022-12-31              14.51              15.00   
3         7  AT0000609607  2022-12-31              11.74              12.06   
4         9  AT0000644505  2022-12-31              98.20              99.20   
...     ...           ...         ...                ...                ...   
3227   6455  XS2284324667  2022-12-31              39.48              38.92   
3228   6457  XS2314659447  2022-12-31               8.87            

## Reading the uploaded file

In [None]:
trg_bucket = 'xetra-bucket-12345'
s3 = boto3.resource('s3')
bucket_trg = s3.Bucket(trg_bucket)
for obj in bucket_trg.objects.all():
    print(obj.key)

In [None]:
prq_obj = bucket_trg.Object(key='xetra_daily_report_20220310_110626.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

In [None]:
df_report