In [228]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

#Plan:
# figure_out_Dates_needed > (extract) get_obj_of_date_needed_and_produce_single_df_from_them > (transform) do_transformations > store_into_target_s3_location

# Adapter layer: All functionality where we interact with our S3

In [229]:
def read_csv_to_df(bucket, key): # S3 > CSV > DF
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode('utf-8') #convert each filtered obj from bucket to a csv
    data = StringIO(csv_obj) #convert the CSV object into string format into memory - so that it can be used without saving into hdd
    df = pd.read_csv(data, delimiter=',') #create a panda's df out of the csv
    return df

def write_df_to_s3(bucket, df, key): # DF > .PARQUET > S3
    out_buffer = BytesIO() #to handle binary data in memory
    df.to_parquet(path=out_buffer, index=False) #Write a DataFrame as a parquet format (a file format) into BytesIO (memory)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key) #uploading the file
    
def write_df_to_s3_csv(bucket, df, key): # DF > .CSV > S3
    out_buffer = StringIO() #to handle binary data in memory
    df.to_csv(path_or_buf=out_buffer, index=False) #Write a DataFrame as a parquet format (a file format) into BytesIO (memory)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key) #uploading the file

def list_files_in_prefix(src_bucket, prefix_date):
    files = [obj.key for obj in src_bucket.objects.filter(Prefix=prefix_date)]
    return files #files names in reality
    


In [230]:
### Application layer - not core

def return_date_list(src_bucket, arg_date, meta_key):
    arg_date_minus1 = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1) #Convert string into datetime obj and get previous date
    today = datetime.today().date() 
    
    try: #Catch exception if there is no meta file when read_csv_to_df method call
        ##Handling Meta-File##
        
        df_meta = read_csv_to_df(src_bucket, meta_key)
        #create a set out of a list of datetime's taken from the 'source_date' column of df_meta
        meta_dates_set = set(pd.to_datetime(df_meta['source_date']).dt.date) 

    
        all_dates_list = [(arg_date_minus1 + timedelta(days=x)) for x in range(0, (today-arg_date_minus1).days+1)] #Create list of string dates min_date to today

        dates_not_in_meta = set(all_dates_list[1:]) - meta_dates_set #Remove any dates already in meta

        if len(dates_not_in_meta) != 0:
            min_date = min(set(all_dates_list[1:]) - meta_dates_set) - timedelta(days=1) #recalculated as now the src-dates have been removed (incase that removed the existing one).

            #Now figure out the actual dates the report is needed for 
            reporting_dates = [date.strftime('%Y-%m-%d') for date in all_dates_list if date >= min_date]

            #Need this inside one of the transform functions
            report_min_date = (min_date + timedelta(days=1)).strftime('%Y-%m-%d')

        else:
            #set these to later handle exceptions better
            reporting_dates = [] #dates the report should be created for
            report_min_date = datetime(2200, 1, 1).date() #indicates not needed
            
    except bucket.session.client('s3').exceptions.NoSuchKey:
        ##If no Meta-File in existance##
        reporting_dates = [(min_date + timedelta(days=x)).strftime('%Y-%m-%d') for x in range(0, (today-min_date).days+1)] #Create list of string dates min_date to today
        #Need this inside one of the transform functions
        report_min_date = arg_date
       
           
    return report_min_date, reporting_dates


#extract_date_list : dates to update (i.e. the dates that have been processed as a report and can go into meta_file)
def update_meta_file(trg_bucket, meta_key, processed_date_list): 
    #read old meta_file as df > concatenate new df to meta_file df > writeback this df to s3
    df_new = pd.DataFrame(columns=['source_date', 'datetime_of_processing'])
    df_new['source_date'] = processed_date_list
    df_new['datetime_of_processing'] = datetime.today().strftime('%Y-%m-%d')
    
    df_old = read_csv_to_df(trg_bucket, meta_key)
    df_old = df_old.append(df_new, ignore_index=True)
    write_df_to_s3_csv(trg_bucket, df_old, meta_key)
    

## Application Layer: The main purpose logic of our program i.e. doing ETL.

In [231]:
def extract(src_bucket, date_list):
    files = [key for date in date_list for key in list_files_in_prefix(src_bucket, date)] #nested list comprehension, left loop is first
    df = pd.concat([read_csv_to_df(src_bucket, obj) for obj in files], ignore_index=True) #ignore_index: if there was any index, it will not be reset to a numerical one after append.
    return df 


#Only creates the first report.
def transform_report1(df, arg_date):
    #Selecting the specific rows that we need for our wanted report
    df = df.loc[:, ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']] #all rows needed to formulate report, but we want only specific columns
    df.dropna(inplace=True) #drop any empty row (just to make sure - even though unlikely to be any)
    
    #Get opening price per ISIN and Day
    #transform('first') is calling a group-by method. It returns the first non-NaN value in a series, or NaN if there is none on each row of the 'StartPrice' columns and through '.transform' it returns this columns - to be set as a new column.
    df['opening_price'] = df.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first') #doesnt appear to have achieved anything
    #transform('first'): It returns a series / dataframe with the same shape as the source group chunk, in which all values in every individual column are replaced with the first non-NaN value in this column, or with NaN if there is none.
    
    #Get closing price per ISIN and Day
    df['closing_price'] = df.sort_values(by='Time').groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
    
    #Aggregations to create most of the desired columns
    df = df.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), closing_price_eur=('closing_price', 'min'), minimum_price_eur=('MinPrice', 'min'), maximum_price_eur=('MaxPrice', 'max'), daily_traded_volume=("TradedVolume", 'sum'))
    
    #Create column that shows the percentage of change in previous days closing price to todays closing price.
    df['prev_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1) 
    df['change_prev_closing_%'] = (df['closing_price_eur'] - df['prev_closing_price']) / df['prev_closing_price'] * 100
    df.drop(columns=['prev_closing_price'], inplace=True)
    df = df.round(decimals=2)

    #Filtering by the passed-in-argument data, as the report should not show older data.
    df = df[df.Date >= arg_date]

    return df


def load(trgt_bucket, df, trg_format, meta_key, processed_date_list):
    ## Write to S3
    key='stocks_daily_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + trg_format
    write_df_to_s3(trgt_bucket, df, key)
    update_meta_file(trgt_bucket, meta_key, processed_date_list)
    return True
    
    
############################################################################################
def etl_report1(src_bucket, trgt_bucket, date_list, arg_date, trg_format, meta_key):
    df = extract(src_bucket, date_list)
    df = transform_report1(df, arg_date)
    dates_that_will_be_processed = [date for date in date_list if date >= arg_date]
    df = load(trgt_bucket, df, trg_format, meta_key, dates_that_will_be_processed)
    return True
    
    

## Main Function - program execution entry point

In [232]:
def main():
    #Parameters/Configurations
    #TODO: Later read as config
    arg_date = '2021-08-23' #This input-argument used to: All data will be extracted since this date.
    src_bucket_name = 'deutsche-boerse-xetra-pds'
    trg_bucket_name = 'stocks-etl-project-essa'
    trg_format = '.parquet'
    meta_key = 'meta_file.csv'
    
    #Init
    s3 = boto3.resource('s3')
    bucket_src = s3.Bucket(src_bucket_name)
    bucket_trg = s3.Bucket(trg_bucket_name)
    
    # Run application
    extract_date, date_list = return_date_list(bucket_trg, arg_date, meta_key) #returns all wanted object.keys
    etl_report1(bucket_src, bucket_trg, date_list, extract_date, trg_format, meta_key)

    

## Run application

In [233]:
main()

## Reading the uploaded file

In [234]:
# #Parameters/Configurations
# #TODO: Later read as config
# arg_date = '2021-08-20' #This input-argument used to: All data will be extracted since this date.
src_bucket_name = 'deutsche-boerse-xetra-pds'
trg_bucket_name = 'stocks-etl-project-essa'
# trg_format = '.parquet'

#Init
s3 = boto3.resource('s3')
bucket_src = s3.Bucket(src_bucket_name)
bucket_trg = s3.Bucket(trg_bucket_name)


for obj in bucket_trg.objects.all():
    print(obj.key) #checking if the file exists in s3

meta_file.csv
stocks_daily_report_20210821_031514.parquet
stocks_daily_report_20210822_013911.parquet
stocks_daily_report_20210822_014404.parquet
stocks_daily_report_20210822_143918.parquet
stocks_daily_report_20210822_160116.parquet
stocks_daily_report_20210823_161933.parquet
stocks_daily_report_20210823_162635.parquet
stocks_daily_report_20210823_164023.parquet
stocks_daily_report_20210823_165830.parquet
stocks_daily_report_20210824_033448.parquet
stocks_daily_report_20210824_033552.parquet
stocks_daily_report_20210824_035214.parquet


In [235]:
prq_obj = bucket_trg.Object(key='stocks_daily_report_20210824_033552.parquet').get().get('Body').read()
data = BytesIO(prq_obj) #pandas only accepts a file on disk or a file-like-object (BytesIO is a file-like-obj)
df_report = pd.read_parquet(data) #using pandas built in function

In [236]:
df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT00000FACC2,2021-08-23,8.44,8.68,8.43,8.68,522,
1,AT0000606306,2021-08-23,20.50,20.60,20.50,20.60,135,
2,AT0000609607,2021-08-23,15.74,15.74,15.74,15.74,0,
3,AT0000644505,2021-08-23,109.60,111.60,109.60,111.60,190,
4,AT0000652011,2021-08-23,33.34,33.79,33.34,33.79,2371,
...,...,...,...,...,...,...,...,...
3051,XS2265368097,2021-08-23,15.24,15.36,15.24,15.37,900,
3052,XS2265369574,2021-08-23,19.89,20.21,19.89,20.21,0,
3053,XS2265369731,2021-08-23,8.67,8.68,8.63,8.73,485,
3054,XS2265370234,2021-08-23,19.74,20.57,19.74,20.57,300,


In [237]:
df_report[df_report['ISIN'] == 'AT00000606306']

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
