In [117]:
import pandas as pd
import boto3
import os
from dotenv import load_dotenv
from io import StringIO, BytesIO
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

In [118]:
# Adapter Layer
def load_config():
    """To load configuration from file .env
    """
    load_dotenv()

def connect_boto3():
    """"""
    s3 = boto3.resource(
        's3',
        aws_access_key_id=os.getenv("ACCESS_KEY"),
        aws_secret_access_key=os.getenv("SECRET_ACCESS_KEY")
    )
    return s3

def read_csv_to_df(bucket, key:str, encoding = "utf-8", sep = ',') -> pd.DataFrame:
    """read csv to dataframe from s3 bucket

    Args:
        bucket (_type_): s3 bucket
        key (str): file's key in s3 bucket
        encoding (str, optional): type of encoding. Defaults to "utf-8".
        sep (str, optional): sep to read csv file. Defaults to ','.

    Returns:
        pd.DataFrame: _description_
    """
    csv_obj = bucket.Object(key=key).get().get("Body").read().decode(encoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter = sep)
    return df



def write_df_to_s3(bucket, df, key):
    """Write dataframe to s3 bucket"""
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

def write_df_to_s3_csv(bucket, df, key):
    out_buffer = StringIO()
    df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

def list_files_in_prefix(bucket, prefix):
    """list file form s3 bucket"""
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files



In [123]:
# Application Layer
## We have in this layer three stages Extract Transform Load

def extract(bucket, date_list):
    """Extract data from files cvs in s3 bucket

    Args:
        bucket (_type_): _description_
        date_list (_type_): _description_

    Returns:
        _type_: _description_
    """

    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    df_data = pd.concat([read_csv_to_df(bucket, obj) for obj in files], ignore_index=True)
    return df_data

def transform_report_one(df, columns, arg_date):
    """"""
    df = df[columns]
    df['OpeningPrice'] = (
        df
        .sort_values(by=['Time'])
        .groupby(['ISIN', 'Date'])['StartPrice']
        .transform('first')
    )
    df['ClosingPrice'] = (
        df
        .sort_values(by=['Time'])
        .groupby(['ISIN', 'Date'])['StartPrice']
        .transform('last')
    )

    df = df.groupby(["ISIN", "Date"], as_index=False)\
            .agg(
                opening_price_eur=('OpeningPrice', 'min'),
                closing_price_eur=('ClosingPrice', 'min'),
                minimun_price_eur=('MinPrice', 'min'),
                maxmun_price_eur=('MaxPrice', 'max'),
                daily_traded_volumne=('TradedVolume', 'sum')
            )

    df["prev_closing_price"] = (
        df.sort_values(by=['Date'])
            .groupby('ISIN')['closing_price_eur'].shift(1)
    )

    df["change_prev_closing_%"] = (
        (df['closing_price_eur'] - df['prev_closing_price']) / df['prev_closing_price'] * 100
    )

    df.drop(columns="prev_closing_price", inplace=True)
    df = df.round(decimals=2)
    df = df[df['Date'] >= arg_date]
    return df

def load(bucket, df, target_key, trg_format, meta_file, extract_date_list, src_format):
    """"""
    key = f"{target_key}{datetime.today().strftime('%Y%m%d_%H%M%S')}{trg_format}"
    write_df_to_s3(bucket, df, key)
    update_meta_file(bucket, meta_file, extract_date_list, src_format)
    return True

def etl_report_one(src_bucket,tgt_bucket, date_list, columns, arg_date, target_key, trg_format, meta_file, src_format):
    # Extract
    df = extract(src_bucket, date_list)
    # Transform
    df = transform_report_one(df, columns, arg_date)
    # Load
    resutl = load(tgt_bucket, df, target_key, trg_format, meta_file, date_list, src_format)

    return resutl
 

In [130]:
# Application Layer Helpers: No core application
def get_date_list(bucket, arg_date, meta_file, src_format):
    """read object from s3 bucket filtered by Date

    Args:
        bucket (_type_): _description_
        arg_date (_type_): _description_
        src_format (_type_): _description_

    Returns:boto3.resource('s3')
        _type_: _description_
    """
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    max_date = datetime.strptime(arg_date, src_format).date() + timedelta(days=1) 
    try:
        today = datetime.today().date() - relativedelta(years=1)
        date_list = [(min_date + timedelta(days=x)).strftime(src_format) for x in range(0, (today-min_date).days + 1)]
        df_meta = read_csv_to_df(bucket, meta_file)
        diff_date_list = set(date_list) - set(df_meta.source_date)
        if diff_date_list:
            list_date = [date for date in sorted(diff_date_list) if date > min_date.strftime(src_format)]
            min_date = min(list_date)
        else:
            min_date = datetime.datetime(2200, 1, 1)
            list_date = []
    except Exception as exp:
        list_date = [(min_date + timedelta(days=x)).strftime(src_format) for x in range(0, (max_date - min_date).days + 1)]
        min_date = arg_date
    return list_date, min_date

def update_meta_file(bucket, meta_file, extract_date_list, src_format):
    df_new = pd.DataFrame(
        data={
            'source_date': extract_date_list,
            'datetime_of_processing': (
                [datetime.today().date().strftime(src_format)] * len(extract_date_list)
            )
        }
    )
    
    df_old = read_csv_to_df(bucket, meta_file)
    
    df = pd.concat([df_new, df_old], ignore_index=True)
    
    write_df_to_s3_csv(bucket, df, meta_file)
    


In [131]:
# entry point

def main():
    """"""
    # paramters
    arg_date = "2022-03-22"
    src_format = '%Y-%m-%d'
    name_src_bucket = 'xetra-1234'
    name_tgt_bucket = 'xetra-data-etl'
    columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
    key = "xetra_daily_report_"
    tgt_format = ".parquet"
    
    


    # load paramters
    load_config()

    # connection to s3
    
    s3 = connect_boto3()

    tgt_bucket = s3.Bucket(name_tgt_bucket)
    src_bucket = s3.Bucket(name_src_bucket)
    # read objects
    date_list, min_date = get_date_list(tgt_bucket, arg_date, meta_file, src_format)
    # pipeline 
    etl_report_one(src_bucket, tgt_bucket, date_list, columns, arg_date, key, tgt_format, meta_file, src_format)


In [132]:
main()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['OpeningPrice'] = (
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['ClosingPrice'] = (


In [133]:
s3 = boto3.resource(
    's3',
    aws_access_key_id=os.getenv("ACCESS_KEY"),
    aws_secret_access_key=os.getenv("SECRET_ACCESS_KEY")
)


In [134]:
name_tgt_bucket = 'xetra-data-etl'
meta_file = "meta_file.csv"
bucket = s3.Bucket(name_tgt_bucket) 

In [135]:
df_meta = read_csv_to_df(bucket, meta_file)
df_meta

Unnamed: 0,source_date,datetime_of_processing
0,2022-03-22,2023-03-27
1,2022-03-23,2023-03-27
2,2022-03-25,2023-03-27
3,2022-03-27,2023-03-27
4,2022-03-26,2021-04-23 12:33:23
5,2022-03-24,2021-04-21 12:30:21


In [136]:
# deutsche-boerse-xetra-pds xetra-1234
df_meta.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 2 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   source_date             6 non-null      object
 1   datetime_of_processing  6 non-null      object
dtypes: object(2)
memory usage: 224.0+ bytes


In [137]:
[obj for obj in bucket.objects.all()]

[s3.ObjectSummary(bucket_name='xetra-data-etl', key='meta_file.csv'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230325_183831.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230326_013108.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230326_150851.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230326_162413.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230326_162505.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230327_160816.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230327_162446.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230327_162530.parquet'),
 s3.ObjectSummary(bucket_name='xetra-data-etl', key='xetra_daily_report_20230327_162800.parquet'),
 s3.ObjectSummary(bucket_name='xetra-da

In [138]:
# bu = s3.Bucket(target_bucket)
# 
csv_obj = bucket.Object(key='xetra_daily_report_20230327_163006.parquet').get().get("Body").read()
data = BytesIO(csv_obj)
df = pd.read_parquet(data)
df

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimun_price_eur,maxmun_price_eur,daily_traded_volumne,change_prev_closing_%
0,AT000000STR1,2022-03-22,37.80,37.80,37.80,37.80,0,
1,AT000000STR1,2022-03-23,37.40,37.60,37.40,37.60,30,-0.53
2,AT000000STR1,2022-03-25,36.80,36.40,36.40,36.80,119,-3.19
3,AT00000FACC2,2022-03-22,8.16,8.10,8.01,8.17,166,
4,AT00000FACC2,2022-03-23,8.07,8.09,8.07,8.09,25,-0.12
...,...,...,...,...,...,...,...,...
9753,XS2434891219,2022-03-23,3.81,3.87,3.81,3.87,0,0.03
9754,XS2434891219,2022-03-25,3.98,4.05,3.96,4.08,14832,4.86
9755,XS2437455608,2022-03-22,23.96,23.63,23.63,23.96,0,
9756,XS2437455608,2022-03-23,23.17,23.94,23.17,23.94,0,1.29
