# DATA ENGINEERING ETL PIPELINE - XETRA DATASET


Onject Oriented Programming Example

Aim:
Write a production ready ETL pipeline using python and pandas.

Overview:
Xetra is a German stock exchange based in Frankfurt operated by Deutsche Börse Group. 
Data related to daily trading activity is stored publicly on the Amazon S3 database. 
(Update - as of July 2022 the data is no longer available. An archival S3 database will be used) 

Task:
- Use jupyter notebook as a protoyping tool.
- Request and extract source data from cloud based web services.
- Use list comprehension to read and consolidate multiple source files.
- Utilise pandas package tools to facilitate ETL process.
- Design and stucture code using object oriented programming techniques. 

Below outlines the steps to be performed:
    
    1) Create adapter layer to handle access to API and web service infrastructure:
        - Connect, read and write to external data sources.
        - Use datetime filtering to specify range and exclude previously processed dates.
       
    2) Create application layer to handle ETL pipeline:
        - Function to EXTRACT bucket data via list comprehension.
        - Function to TRANSFORM bucket data using pandas functions. 
        - Function to LOAD transformed data to Amazon S3 target bucket. 
        
    3) Create main() to serve as program entry point: 
        - Define user parameters and configurations.
        - Run ETL application.
        - Load transformed data into private S3 bucket.
        - Read data from private S3 bucket to verify ETL pipeline. 

# Define Packages

In [17]:
#Packages to be imported
import boto3 #AWS service management package.
import pandas as pd #Data analysis library.
from io import StringIO #String buffer to read CSV files.
from io import BytesIO #Bytes buffer to read PARQUET files.
from datetime import datetime, timedelta #Facilitate calulations relating to day of trade. 

# Adapter Layer


In [18]:
#Method to convert bucket data from csv into pandas dataframe.
def read_csv_to_df (bucket, key, decding = 'utf-8', sep =','):
    csv_obj = bucket.Object(key=key).get().get('Body') #Read data element from list.
    csv_obj = csv_obj.read().decode('utf-8') #Store into to csv object in utf-8 format.
    in_buf = StringIO(csv_obj) #Buffer to store csv object as string data.
    df = pd.read_csv(in_buf, delimiter=sep) #Read data into pandas data frame.
    return df

#Method to write output df to target S3 bucket as a parquet file.
def write_df_to_S3(bucket, df, key ):
    out_buf = BytesIO() #Buffer to store df object to parquet data. 
    df.to_parquet(out_buf, index=False) #Write df to parquet buffer.
    bucket.put_object(Body=out_buf.getvalue(),Key=key) #Store data into S3 bucket
    return True  

#Method to return list of files in bucket filtered by date. 
def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files

# Application Layer (Non-Core)

In [72]:
#Method to return list of files in bucket between minimum filtering date and today's date.
def return_date_list(bucket, arg_date, arg_date_format): 
    #Min_date is one less than arg_date as previous calender day required for trade calculations.
    min_date = datetime.strptime(arg_date, arg_date_format) 
    min_date = min_date.date()- timedelta(days=1)   
    #Store today's date and filter by selected date range.         
    today = datetime.today().date()
    return_date_list = [(min_date + timedelta(days=x)).strftime(arg_date_format)for x in range (0 , (today - min_date).days + 1)]
    return return_date_list

In [120]:
#Set date format and values.
arg_date_format = '%Y-%m-%d'
arg_date = '2021-12-20'
today_date = '2021-12-21'  

#Read metafile that contains list of previously processed dates into dataframe. 
trg_bucket = 'xetra-probe' 
s3 = boto3.resource('s3') 
bucket_trg = s3.Bucket(trg_bucket)
meta_key = 'meta_file.csv'  
df_meta = read_csv_to_df(bucket_trg, meta_key )

#Min date is the previous day of trade before the argument date. Required for percentage change calculations. 
min_date = datetime.strptime(arg_date, arg_date_format).date() - timedelta(days=1) 

#Return list of dates between min_date and today_date).
today = datetime.strptime(today_date, arg_date_format).date()
return_date_list = [(min_date + timedelta(days=x))for x in range (0 , (today - min_date).days + 1)]

#Store processed dates from metafile into a set. 
meta_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)

#Unique values in return_date_list when compared to meta_dates indicates dates to extract from S3 data.   
dates_to_extract = set(return_date_list[1:]) - meta_dates

if dates_to_extract:
    #Recalculate min_date.  
    min_date = min(set(return_date_list[1:]) - meta_dates)- timedelta(days=1)
    #Filter return dates from source data based on verified min_date.
    return_dates = [date.strftime(arg_date_format) for date in return_date_list if date >= min_date]
    return_min_date = arg_date
else: 
    return_dates = []
    return_min_date = datetime(9999,1,1).date() 

# Application Layer (Core)

In [73]:
#Method to extract bucket data. 
def extract_bucket_data(bucket, date_list):
    #Extract list of csv files from bucket by date prefix. 
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    #Read body of extracted csv files into master dataframe.
    df = pd.concat([read_csv_to_df(bucket,obj) for obj in files], ignore_index=True)
    return df

#Method to transform bucket data. 
def transform_bucket_data(df,columns_use,arg_date):
    #Method to reove unecessary columns and missing values from data.
    df = df.loc[:,columns_use] #Remove unecessary columns 
    df.dropna(inplace=True) #Drop all missing values from the dataset.
    df = df.reset_index(drop=True) #Reset the column index.
    df.shape #Check if there was any filtering (should match table dimensions).
    
    #Get opening price per ISIN on a particular day.  
    df['OpeningPrice'] = df.sort_values('Time').groupby(['ISIN','Date'])['StartPrice'].transform('first')

    #Get closing price per ISIN on a particular day. 
    df['ClosingPrice'] = df.sort_values('Time').groupby(['ISIN','Date'])['EndPrice'].transform('last')

    #Aggregate data per ISIN on a particular day.
    df = df.groupby(['ISIN','Date'], as_index = False).agg(OpeningPriceEUR = ('OpeningPrice', 'min'),ClosingPriceEUR = ('ClosingPrice', 'min'), MinPriceEUR = ('MinPrice', 'min'), MaxPriceEUR = ('MaxPrice', 'max'), DailyTradedVolume = ('TradedVolume', 'sum'))

    #Percentage change in closing price between current and pervious day of trade. 
    df['PrevClosingPriceEUR'] = df.sort_values(by = 'Date').groupby(['ISIN'])['ClosingPriceEUR'].shift(1)
    df['DeltaPrevClosingPriceEUR%'] = (df['ClosingPriceEUR'] - df['PrevClosingPriceEUR'])/df['PrevClosingPriceEUR']*100

    #Round aggregated data. 
    df = df.round(decimals = 2)

    #Filter output by specified by argument date
    df = df[df['Date']>=arg_date]
    return df

#Method to load transformed bucket data.
def load_bucket_data(bucket, df, bucket_key, bucket_format):
    #Parametised key name for Amazon target bucket. 
    bucket_key = bucket_key + datetime.today().strftime('%Y%m%d_%H%M%S') + bucket_format
    write_df_to_S3(bucket, df, bucket_key)
    return True

#ETL function. 
def etl_report(src_bucket, trg_bucket, date_list, columns, arg_date, trg_key, trg_format):
    df = extract_bucket_data(src_bucket, date_list)
    df = transform_bucket_data(df,columns_use,arg_date)
    df = load_bucket_data(trg_bucket, df, trg_bucket_key, trg_bucket_format)
    return True

# Main

In [74]:
def main():
    #User defined parameters and configurations. 
    arg_date = '2022-12-18' #Bucket filtering argument.
    arg_date_format = '%Y-%m-%d' #Date format.
    src_bucket = 'xetra-1234' #Source data bucket name
    trg_bucket = 'xetra-probe'#Target data bucket name
    trg_bucket_key = 'xetra_daily_report' #String to prepend target bucket name.
    trg_bucket_format = '.parquet' #String to append target bucket name.
    columns_use = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
    
    #Initialise connections and create bucket instances from Amazon S3 resource.
    s3 = boto3.resource('s3') 
    bucket_src = s3.Bucket(src_bucket)
    bucket_trg = s3.Bucket(trg_bucket)
    
    #Run ETL pipeline application.
    date_list = return_date_list(bucket_src, arg_date, arg_date_format)
    etl_report(bucket_src, bucket_trg, date_list, columns_use, arg_date, trg_bucket_key, trg_bucket_format)
    
    #Verify output by reading data back into workflow. 
    for obj in bucket_trg.objects.all():
        prq_obj = bucket_trg.Object(key=obj.key).get().get('Body').read()
        data = BytesIO(prq_obj)
        df_report = pd.read_parquet(data)
    
    return df_report

In [75]:
#Run main function
main()

Unnamed: 0,ISIN,Date,OpeningPriceEUR,ClosingPriceEUR,MinPriceEUR,MaxPriceEUR,DailyTradedVolume,PrevClosingPriceEUR,DeltaPrevClosingPriceEUR%
0,AT000000STR1,2022-12-18,38.85,38.60,38.60,38.85,153,39.35,-1.91
1,AT000000STR1,2022-12-19,38.85,38.60,38.60,38.85,153,38.60,0.00
2,AT00000FACC2,2022-12-18,8.86,8.83,8.83,8.90,300,9.12,-3.18
3,AT00000FACC2,2022-12-19,8.86,8.83,8.83,8.90,300,8.83,0.00
4,AT0000606306,2022-12-18,25.02,25.06,24.72,25.40,8382,24.80,1.05
...,...,...,...,...,...,...,...,...,...
6443,XS2314660700,2022-12-19,21.32,21.50,20.98,21.56,241,21.50,0.00
6444,XS2376095068,2022-12-18,35.72,35.37,35.00,35.72,2140,36.91,-4.19
6445,XS2376095068,2022-12-19,35.72,35.37,35.00,35.72,2140,35.37,0.00
6446,XS2434891219,2022-12-18,3.59,3.55,3.55,3.59,0,3.70,-4.11
