In [3]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [4]:
#Adapter Layer - all functionality that interacts with the S3

"""
-------------------------------------------------------------------------
Function: read_csv_to_df(bucket, key, decoding = 'utf-8', separator = ',')

Purpose: Trasnform a csv file to a dataframe 

Explanation:
        - Decode the csv file of the bucket
        - Use StringIO to make it into 
            The StringIO module is an in-memory file-like object. 
            This object can be used as input or output to the most 
            function that would expect a standard file object.
        - Create the dataframe using a pandas data frame
            Read a comma-separated values (csv) file into DataFrame.
Parameters: 
            Bucket, Object, Source bucket or destination bucket
            key, String, Specific csv file that will be converted to csv

Returns:
            df, dataframe, dataframe of the csv

-------------------------------------------------------------------------
"""

def read_csv_to_df(bucket, key, decoding = 'utf-8', separator = ','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=separator)
    return df


"""
-------------------------------------------------------------------------
Function: write_df_to_s3(bucket, df, key)

Purpose: Write the df into an S3 bucket

Explanation:
        - Create an object of BytesIO.
            This class is like StringIO for bytes objects.
        - Trasnform the dataframe into a parquet file.
        - Put parquet file in the S3 bucket.
            
What is a parquet file format?
        Parquet is an open source file format available to any project 
        in the Hadoop ecosystem. Apache Parquet is designed for efficient 
        as well as performant flat columnar storage format of data 
        compared to row based files like CSV or TSV files. ... 
        Parquet can only read the needed columns therefore greatly 
        minimizing the IO.
        
Parameters: 
            Bucket, Object, Source bucket or destination bucket.
            df, dataframe, dataframe to be changed to parquet object.
            key, String, Specific csv file that will be converted to csv.

Returns:
            df, dataframe, dataframe of the csv

-------------------------------------------------------------------------
"""
def write_df_to_s3(bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

"""
-------------------------------------------------------------------------
Function: write_df_to_s3_csv(bucket, df, key)

Purpose: Method to update the metafile.

Explanation:
        - Create an object of StringIO.
        - Trasnform the dataframe into a csv file.
        - Put csv file in the S3 bucket.
        
Parameters: 
            Bucket, Object, Source bucket or destination bucket.
            df, dataframe, dataframe to be changed to parquet object.
            key, String, Specific csv file that will be converted to csv.

Returns:
            df, dataframe, dataframe of the csv
-------------------------------------------------------------------------
"""
def write_df_to_s3_csv(bucket, df, key):
    out_buffer = StringIO()
    df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

"""
-------------------------------------------------------------------------
Function: list_files_in_prefix(bucket, prefix)

Purpose: Get the key of every object in the file

Explanation:
        - Create a list of the key of every object
        
        
Parameters: 
            Bucket, Object, Source bucket or destination bucket.
            prefix, string, date of the file

Returns:
            files, list, list of the keys 
-------------------------------------------------------------------------
"""
def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files
    

In [5]:
#Application Layer
"""
-------------------------------------------------------------------------
Function: extract(bucket, date_list)

Purpose: Extract dataframes from files

Explanation: 
        - Files: Get the key (As a string) of each file in a list
        - df: Create a dataframe that holds all the values of the days
              of the tables we want to extract
        
        
Parameters: bucket, obj, S3 source bucket where we are extracting data
            date_list, list, dates of the dataframes we are trying to put together 

Returns: df, dataframe, combined dataframe with the data of all the dates
            
-------------------------------------------------------------------------
"""
def extract(bucket, date_list):
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    df = pd.concat([read_csv_to_df(bucket, obj) for obj in files], ignore_index=True)
    return df

"""
-------------------------------------------------------------------------
Function: transform_report1(df, columns, arg_date)

Purpose: Transform the dataframe to create the new cols we want for the
         report

Explanation: 
        - Drop the columns that we do not want from the dataframe
        - Drop all the values that have empty spaces
        - Create a colum called opening prices
        - Create a colum called closing prices
        - Create a colum called previous closing price
        - Create a colum called change previous closing percentage
        
        
Parameters: df, dateframe, datraframe that must eb transformed
            columns, list of strings, desired columns we want
            arg_date, date, date that we are inquiring for report

Returns: df, dataframe, combined dataframe with the data of all the dates
            
-------------------------------------------------------------------------
"""
def transform_report1(df, columns, arg_date):
    df = df.loc[:, columns]
    df.dropna(inplace=True)
    df['opening_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')
    df['closing_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
    df = df.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), closing_price_eur=('closing_price', 'min'), minimum_price_eur=('MinPrice', 'min'), maximum_price_eur=('MaxPrice', 'max'), daily_traded_volume=('TradedVolume', 'sum'))
    df['prev_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    df['change_prev_closing_%'] = (df['closing_price_eur'] - df['prev_closing_price']) / df['prev_closing_price'] * 100
    df.drop(columns=['prev_closing_price'], inplace=True)
    df = df.round(decimals=2)
    df = df[df.Date >= arg_date]
    return df
"""
-------------------------------------------------------------------------
Function: load(bucket, df, trg_key, trg_format, meta_key, extract_date_list)

Purpose: Load transformed dataframe into target bucket

Explanation: 
        - Create string for the key in the proper format
        - Call function to upload file to upload dataframe into bucket
        - Update the metafile
        
        
Parameters: bucket, obj, target bucket where we want to load the dataframe
            df, dataframe, transformed dataframe
            trg_key, string, name of the report we are currenlty targeting
            trg_format, string, string of .parquet (Type of file that receives Amazon)
            meta_key, string, name of the metafile
            extract_date_list, list, list of dates

Returns: True
            
-------------------------------------------------------------------------
"""
def load(bucket, df, trg_key, trg_format, meta_key, extract_date_list):
    key = trg_key + datetime.today().strftime("%Y%m%d_%H%M%S") + trg_format
    write_df_to_s3(bucket, df, key)
    update_meta_file(bucket, meta_key, extract_date_list) 
    return True

"""
-------------------------------------------------------------------------
Function: etl_report(src_bucket, trg_bucket, date_list, columns, arg_date, trg_key, trg_format, meta_key)

Purpose: Create the table of the extract values we are looking into.

Explanation:
        - df: call extract(src_bucket, date_list) method to create 
              Dateframe of the data of the dates choosen.
        - df: call transform_report1(df, columns, arg_date) method
              to update the dateframe to the reports we want to see
        - extract_date_list: list of the dates missing to update in 
              the target bucket.
        - call load(trg_bucket, df, trg_key, trg_format, meta_key, extract_date_list)
            to update the method
        
Parameters: 
            src_bucket, Object, Source bucket.
            trg_bucket, Object, destination bucket.
            date_list, list, list of the dates we are updating.
            columns, list, columns of our report
            arg_date, date, date we are looking to have the reports
            trg_key, string, string of the name of the database we will
                             be exploring
            trg_format, string, the target format we want
            meta_key, string, meta key file in the S3 

Returns:
            True
-------------------------------------------------------------------------
"""
def etl_report(src_bucket, trg_bucket, date_list, columns, arg_date, trg_key, trg_format, meta_key):
    df = extract(src_bucket, date_list)
    df = transform_report1(df, columns, arg_date)
    extract_date_list = [date for date in date_list if date >= arg_date]
    load(trg_bucket, df, trg_key, trg_format, meta_key, extract_date_list)
    return True
    

In [6]:
#Application Layer - Not Core
"""
-------------------------------------------------------------------------
Function: return_date_list(bucket, arg_date, src_format, meta_key)

Purpose: Understand the minimum day and the dates that have not been
         explored.

Explanation:
        - min_date: Get the previous date of the arg_date we are 
                    looking into
        - today: Get today's date
        
        Try Block:
            - df_meta: Transform meta csv file into a dateframe
            - dates: Creates a list of all the values we have
                     not look into. 
            - src_dates: Creates a set of the source values of the
                        meta_file
            - dates_missing: Creates a set of the values missing
            
            If there are dates missing:
            
                min_date = minimum value of the set
                return_dates = list of all values greater than min date
                return_min_date = value of min date to be return, one more 
                                  day than the value of min_date
                
            Else if no dates missing:
                
                return_dates = return an empty list
                return_min_date = return a date 2200,1,1
        
        Exception (Any Errors within the connection of the bucket):
            - return_dates = list of all values greater than min date
            - return_min_date = value of min date to be return, one more 
                                  day than the value of min_date
        
Parameters: 
            Bucket, Object, Source bucket or destination bucket.
            df, dataframe, dataframe to be changed to parquet object.
            key, String, Specific csv file that will be converted to csv.

Returns:
            return_min_date, date, last day from where our data was called
            return_dates, list, list of dates that have to be explored
-------------------------------------------------------------------------
"""
def return_date_list(bucket, arg_date, src_format, meta_key):
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    today = datetime.today().date()
    try:
        df_meta = read_csv_to_df(bucket, meta_key)
        dates = [(min_date + timedelta(days=x)) for x in range(0,(today - min_date).days + 1)]
        print(dates)
        src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)
        print(src_dates)
        dates_missing = set(dates[1:]) - src_dates
        print(dates_missing)
        if dates_missing:
            min_date = min(set(dates[1:]) - src_dates) - timedelta(days=1)
            return_dates = [date.strftime(src_format)for date in dates if date >= min_date]
            return_min_date = (min_date + timedelta(days=1)).strftime(src_format)
        else:
            return_dates = []
            return_min_date = datetime(2200,1,1).date()
    except bucket.session.client('s3').exceptions.NoSuchKey:
        return_dates = [(min_date + timedelta(days=x)).strftime(src_format) for x in range(0,(today - min_date).days + 1)]
        return_min_date = arg_date
    return return_min_date, return_dates

"""
-------------------------------------------------------------------------
Function: update_meta_file(bucket, meta_key, extract_date_list)

Purpose: Update our current meta_file

Explanation:
        - Create new data frame with columns: source_date, date
        - Update the values of 'source_date' with new values
        - Update the values of 'datetime_of_processing' with new values
        - Create in memory df_old to have the current df
        - Put both the df_old and df_new together
        - Update the meta file in the bucket  
        
Parameters: 
            Bucket, Object, Source bucket or destination bucket.
            meta_key, string, meta key we are reading
            extract_date_list, list, list that would update the metafile

Returns:
            None
-------------------------------------------------------------------------
"""
def update_meta_file(bucket, meta_key, extract_date_list):
    df_new = pd.Dateframe(columns=['source_date', 'date'])
    df_new['source_date'] = extract_date_list
    df_new['datetime_of_processing'] = datetime.today().strftime('%Y-%m-%d')
    df_old = read_csv_to_df(bucket, meta_key)
    df_all = pd.concat(df_old, df_new)
    write_df_to_s3_csv(bucket, df_all, meta_key)
    
    

In [7]:
#Main Function Entry Point
def main():
    #Parameters/Configurations
    #Later Read config
    arg_date = '2021-10-24'
    src_format = '%Y-%m-%d'
    src_bucket = 'deutsche-boerse-xetra-pds'
    trg_bucket = 'xetra-123'
    columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
    trg_key = 'xetra_daily_report_'
    trg_format = '.parquet'
    meta_key = 'meta_file.csv'
    
    #Init our connection
    s3 = boto3.resource('s3')
    bucket_src = s3.Bucket(src_bucket)
    bucket_trg = s3.Bucket(trg_bucket)
    
    #Run Application
    extract_date, date_list = return_date_list(bucket_trg, arg_date, src_format, meta_key)
    etl_report(bucket_src, bucket_trg, date_list, columns, extract_date, trg_key, trg_format, meta_key)
    

In [8]:
#Run
main()

[datetime.date(2021, 10, 23), datetime.date(2021, 10, 24), datetime.date(2021, 10, 25), datetime.date(2021, 10, 26), datetime.date(2021, 10, 27), datetime.date(2021, 10, 28)]
{datetime.date(2021, 10, 12), datetime.date(2021, 10, 13)}
{datetime.date(2021, 10, 28), datetime.date(2021, 10, 26), datetime.date(2021, 10, 27), datetime.date(2021, 10, 24), datetime.date(2021, 10, 25)}


ReadTimeoutError: Read timeout on endpoint URL: "None"

## Reading the uploaded file

In [None]:
trg_bucket = 'xetra-123'
s3 = boto3.resource('s3')
bucket_trg = s3.Bucket(trg_bucket)
for obj in bucket_trg.objects.all():
    print(obj.key)

In [None]:
prq_obj = bucket_trg.Object(key='xetra_daily_report_20211023_230203.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

In [None]:
df_report