In [46]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [47]:
# Adapter Layer

def read_csv_to_df(bucket, key, decoding="utf-8", delimeter=","):
    csv_obj = bucket.Object(key=key).get().get("Body").read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=delimeter)
    return df


def write_df_to_s3(df, bucket, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True


def write_df_to_s3_csv(df, bucket, key):
    out_buffer = StringIO()
    df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True


def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files


def date_from_key(bucket_object, date_format):
    return datetime.strptime(bucket_object.key.split("/")[0], date_format).date()

In [48]:
# Application layer - core

def extract(bucket, date_list):
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    df_list = []
    for file in files:
        df = read_csv_to_df(bucket, file)
        if not df.empty:
            df_list.append(df)
    df = pd.concat(df_list, ignore_index=True)
    return df


def transform_report1(df, columns, arg_date):
    df = df.loc[:, columns]
    df["OpeningPrice"] = df.sort_values(by=["Time"]).groupby(["ISIN", "Date"])["StartPrice"].transform("first")
    df["ClosingPrice"] = df.sort_values(by=["Time"]).groupby(["ISIN", "Date"])["StartPrice"].transform("last")
    df = df.groupby(["ISIN", "Date"], as_index=False).agg(
        OpeningPriceEur=("OpeningPrice", "min"),
        ClosingPriceEur=("ClosingPrice", "min"),
        MinimumPriceEur=("MinPrice", "min"),
        MaximumPriceEur=("MaxPrice", "max"),
        DailyTradedVolume=("TradedVolume", "sum")
    )
    df["PrevClosingPrice"] = df.sort_values(by=["Date"]).groupby(["ISIN"])["ClosingPriceEur"].shift(1)
    df["ChangePrevClosing%"] = (df["ClosingPriceEur"] - df["PrevClosingPrice"]) / df["PrevClosingPrice"] * 100
    df.drop(columns=["PrevClosingPrice"], inplace=True)
    df = df.round(decimals=2)
    df = df[df.Date >= arg_date]
    return df


def load(bucket, df, target_key, target_format, arg_date, meta_key, extract_date_list, date_format):
    key = target_key + "_" + arg_date + "." + target_format
    write_df_to_s3(df, bucket, key)
    update_meta_file(bucket, meta_key, extract_date_list, date_format)
    return True


def etl_report1(src_bucket, trg_bucket, date_list, columns, arg_date, trg_key, trg_format, meta_key, date_format):
    df = extract(src_bucket, date_list)
    df = transform_report1(df, columns, arg_date)
    extract_date_list = [date for date in date_list if date >= arg_date]
    load(trg_bucket, df, trg_key, trg_format, arg_date, meta_key, extract_date_list, date_format)
    return True

In [49]:
# Apllication layer

def return_date_list(bucket, arg_date, date_format, meta_key):
    min_date = datetime.strptime(arg_date, date_format).date() - timedelta(days=1)
    end_date = datetime(year=2022, month=3, day=20).date()
    try:
        df_meta = read_csv_to_df(bucket, meta_key)
        dates = [(min_date + timedelta(days=x)) for x in range(0, (end_date - min_date).days + 1)]
        src_dates = set(pd.to_datetime(df_meta["SourceDate"]).dt.date)
        dates_missing = set(dates[1:]) - src_dates
        if dates_missing:
            min_date = min(dates_missing) - timedelta(days=1)
            return_dates = [date.strftime(date_format) for date in dates if date >= min_date]
            return_min_date = (min_date + timedelta(days=1)).strftime(date_format)
        else:
            return_dates = []
            return_min_date = datetime(2200, 1, 1).date()
    except boto3.client("s3").exceptions.NoSuchKey:
        return_dates = [(min_date + timedelta(days=d)).strftime(date_format) for d in range(0, (end_date - min_date).days + 1)]
        return_min_date = arg_date
    return return_min_date, return_dates


def update_meta_file(bucket, meta_key, extract_date_list, date_format):
    df_new = pd.DataFrame(columns=["SourceDate", "DatetimeOfProcessing"])
    df_new["SourceDate"] = extract_date_list
    df_new["DatetimeOfProcessing"] = datetime.today().strftime(date_format)
    df_old = read_csv_to_df(bucket, meta_key)
    df_all = pd.concat([df_old, df_new])
    write_df_to_s3_csv(df_all, bucket, meta_key)

In [50]:
# Main function entrypoint

def main():
    # Parameters/Configurations
    # Later read config
    arg_date = "2022-03-16"
    date_format = "%Y-%m-%d"
    src_bucket = "xetra-1234"
    trg_key = "xetra_daily_report"
    trg_format = "parquet"
    trg_bucket = "xetra-kelvedler"
    columns = ["ISIN", "Date", "Time", "StartPrice", "MaxPrice", "MinPrice", "EndPrice", "TradedVolume"]
    meta_key = "meta_file.csv"

    # Init
    s3 = boto3.resource("s3")
    bucket_src = s3.Bucket(src_bucket)
    bucket_trg = s3.Bucket(trg_bucket)

    # Run application
    extract_date, date_list = return_date_list(bucket_trg, arg_date, date_format, meta_key)
    etl_report1(bucket_src, bucket_trg, date_list, columns, extract_date, trg_key, trg_format, meta_key, date_format)

In [51]:
# Run

main()

## Reading the uploaded file

In [52]:
trg_bucket = "xetra-kelvedler"
s3 = boto3.resource("s3")
bucket_trg = s3.Bucket(trg_bucket)
for obj in bucket_trg.objects.all():
    print(obj.key)

meta_file.csv
xetra_daily_report_2022-03-17.parquet


In [53]:
prq_obj = bucket_trg.Object(key="xetra_daily_report_2022-03-17.parquet").get().get("Body").read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

In [54]:
df_report

Unnamed: 0,ISIN,Date,OpeningPriceEur,ClosingPriceEur,MinimumPriceEur,MaximumPriceEur,DailyTradedVolume,ChangePrevClosing%
0,AT000000STR1,2022-03-17,37.50,37.20,37.20,37.50,1117,0.13
1,AT000000STR1,2022-03-18,37.30,37.15,37.15,37.30,60,-0.13
2,AT00000FACC2,2022-03-18,7.78,7.64,7.64,7.78,10,-5.33
3,AT0000606306,2022-03-17,14.78,13.78,13.35,14.90,98483,-5.10
4,AT0000606306,2022-03-18,13.69,13.43,13.03,13.69,48420,-2.54
...,...,...,...,...,...,...,...,...
6460,XS2427474023,2022-03-18,23.62,23.99,23.62,23.99,0,-0.24
6461,XS2434891219,2022-03-17,3.69,3.67,3.67,3.70,0,0.31
6462,XS2434891219,2022-03-18,3.67,3.73,3.66,3.73,33430,1.73
6463,XS2437455608,2022-03-17,24.29,24.29,24.29,24.29,0,0.07
