In [38]:
"""This script imports necessary libraries for data processing and AWS S3 interaction.

- pandas (pd): Used for data manipulation and analysis.
- numpy (np): Used for advanced numerical operations and arrays.
- boto3: Provides an interface to interact with Amazon Web Services (AWS), including Amazon S3.
- StringIO: A module for working with in-memory string buffers, often used for reading data from or writing data to strings.
"""
import boto3
import pandas as pd
import numpy as np
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [39]:
# Adapter Layer

# 1. Reading data from source bucket and convert o pandas dataframe
def read_csv_to_df (src_bucket, key, decoding = 'utf-8', seperater = ','):
    csv_obj = src_bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter = seperater)
    return df

# 2. Exporting data from merged dataframe to source bucket in the form of parquet file
def write_df_to_s3(trg_bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index = False)
    trg_bucket.put_object(Body = out_buffer.getvalue(), Key = key)
    return True

# Return list of files present for a particular date provided
def list_files_in_prefix(bucket, prefix_date):
    files = [obj.key for obj in bucket.objects.filter(Prefix = prefix_date)]
    return files


In [40]:
# Application layer

# 1. Extracting data and merging it to a single merged datafranme
def extract(src_bucket, date_list):
    files = [key for date in date_list for key in list_files_in_prefix(src_bucket, date)]
    df_merged = pd.concat([read_csv_to_df(src_bucket, obj) for obj in files], ignore_index = True)
    return df_merged

# 2. Data Transformation
def transform_data1(df_merged, needed_columns, arg_date):
    # Filtering the dataframe to onnly the needed columns
    df_merged = df_merged.loc[:,needed_columns]

    # Dropping the NUll values 
    df_merged.dropna(inplace =True)

    # Creating a new column named 'opening_price'
    df_merged['opening_price'] = df_merged.sort_values(['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')

    # Creating a new column named 'closing_price'
    df_merged['closing_price'] = df_merged.sort_values(['Time']).groupby(['ISIN', 'Date'])['EndPrice'].transform('last')

    # Aggregating the dataframe from hourly to daily data
    df_merged = df_merged.groupby(['ISIN', 'Date'], as_index=False)\
                .agg(opening_price_eur=('opening_price', 'min'), 
                     closing_price_eur = ('closing_price', 'min'), 
                     mimimum_price_eur = ('MinPrice','min'), 
                     maximum_price_eur = ('MaxPrice','max'), 
                     daily_traded_volume =('TradedVolume','sum'))
    
    # creating a new column previous_closiong_price and assigning the value of closing_price_eur column of previous day
    df_merged['previous_closing_price'] = df_merged.sort_values(['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)

    # Calculating precent change from previous day's closing price
    df_merged['change_in_closing_in_%'] = round((df_merged['closing_price_eur'] - df_merged['previous_closing_price'])\
                                                /df_merged['previous_closing_price']*100,2)

    # Dropping column previous_closing_price as it is no longer needed
    df_merged.drop(columns = ['previous_closing_price'], inplace = True)

    # Lets round every number to upto two decimals
    df_merged = df_merged.round(decimals = 2)

    # returning the merged dataframe only after the given argumetn date
    df_merged = df_merged[df_merged.Date >= arg_date]
    return df_merged


# 3. Load function
def load_to_s3(trg_bucket, df_merged, target_key_format, target_file_format):
    key = target_key_format + datetime.today().strftime("%Y%m%d_%H%M%S") + target_file_format
    write_df_to_s3(trg_bucket, df_merged , key)
    return True

def etl_report1(src_bucket, date_list, needed_columns, arg_date, trg_bucket, target_key_format, target_file_format):
    df_merged = extract(src_bucket, date_list) # Using extract function to extract data to a dataframe
    df_merged = transform_data1(df_merged, needed_columns, arg_date) # Transforming the data using transform function
    load_to_s3(trg_bucket, df_merged, target_key_format, target_file_format) # Loading the transformed data to the target bucket
    return True
    

In [41]:
# Application Layer ---- Not Core

# 1. Return a list of all the dates from a given date till today (i.e the date code is run) in string format (== src_format)
def return_date_list (arg_date,src_format):
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days =1)
    todays_date = datetime.today().date()
    date_list = [(min_date+timedelta(days =x)).strftime(src_format) for x in range ((todays_date - min_date).days+1)]
    return date_list

In [42]:
# Creating Main Function
def main():
    # listing necessary parameters/Configurations
    # Later read config    
    arg_date = '2022-12-25' # Setting an argument date from which we need data into our dataframe "2022-12-25" in our case      
    src_format = '%Y-%m-%d'   # Setting source format of date     
    src_bucket_name = 'xetra-1234' # setting source bucket name    
    trg_bucket_name = 'xetra-1234-etl-target-bucket' # Setting target bucket   
    # There are many columns which we do not need for further processing lets get rid of unwanted data
    columns_to_included = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice','EndPrice', 'TradedVolume'] 
    target_key_format = 'xetra_daily_report' # Describing the name of the key for output parquet file
    target_file_format = '.parquet' # Describing target file format

    # Init the connections
    s3 = boto3.resource('s3')
    src_bucket = s3.Bucket(src_bucket_name) # Connecting to source S3 bucket
    trg_bucket = s3.Bucket(trg_bucket_name) # Connecting to target S3 bucket

    # run application
    date_list = return_date_list(arg_date, src_format)
    etl_report1(src_bucket, date_list, columns_to_included, arg_date, trg_bucket, target_key_format, target_file_format)

In [43]:
# Run
main()

  df_merged = pd.concat([read_csv_to_df(src_bucket, obj) for obj in files], ignore_index = True)


### Reading the uploaded file to check whether the file content is correct

In [44]:
trg_bucket_name = 'xetra-1234-etl-target-bucket'
s3 = boto3.resource('s3')
trg_bucket = s3.Bucket(trg_bucket_name)
# reading the name of parquet file from S3 bucket
for obj in trg_bucket.objects.all():
    print (obj.key)

xetra_daily_report20231022_204256.parquet
xetra_daily_report20231023_100331.parquet
xetra_daily_report20231030_232951.parquet
xetra_daily_report20231103_225752.parquet


* Here we can see that Functional Implemantation of ETL Pipeline was successful and the new parquet file named
  "xetra_daily_report20231103_225752.parquet" was created inside source bucket after successfull implementation of this code.

In [45]:
# Getting the S3 object ('prq_obj') from the specified S3 bucket ('bucket_target')
# The object key is set to 'xetra_daily_report20231030_232951.parquet'
# The object is retrieved from S3 and its body is read as bytes
prq_obj = trg_bucket.Object(key='xetra_daily_report20231030_232951.parquet').get().get('Body').read()

# Create a BytesIO object ('data') and load the Parquet data ('prq_obj') into it
data = BytesIO(prq_obj)

# Read the Parquet data from the BytesIO object into a pandas DataFrame ('df_report')
df_report = pd.read_parquet(data)


In [46]:
# Uploaded data content is filfilling the business needs
df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,mimimum_price_eur,maximum_price_eur,daily_traded_volume,change_in_closing_in_%
0,AT000000STR1,2022-12-25,36.10,37.70,36.10,37.70,2864,4.43
1,AT000000STR1,2022-12-26,36.10,37.70,36.10,37.70,2864,0.00
2,AT000000STR1,2022-12-27,36.10,37.70,36.10,37.70,2864,0.00
3,AT000000STR1,2022-12-28,36.60,36.70,35.75,36.70,1773,-2.65
4,AT000000STR1,2022-12-29,36.60,36.70,35.75,36.70,1773,0.00
...,...,...,...,...,...,...,...,...
22364,XS2434891219,2022-12-27,3.44,3.50,3.44,3.50,0,0.00
22365,XS2434891219,2022-12-28,3.44,3.66,3.42,3.66,0,4.53
22366,XS2434891219,2022-12-29,3.44,3.66,3.42,3.66,0,0.00
22367,XS2434891219,2022-12-30,3.44,3.66,3.42,3.66,0,0.00
