In [23]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime as dt

In [24]:
def list_of_files(bucket):
    files = [obj.key for obj in bucket]
    return files

def read_csv_to_df(bucket, key, decoding = 'utf-8', sep = ','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

def write_df_to_s3(s3, bucket_trg, df, file_name):
    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    s3.Object(bucket_trg, file_name).put(Body=csv_buffer.getvalue())
    return True

In [96]:
def update_local_log():
    df = pd.read_csv('write_log.csv')
    filename = 'stock_data_cleansed_' + dt.today().strftime("%Y%m%d_%H:%M:%S") + '.csv'
    timestamp = filename[20:37]
    df.loc[len(df.index)] = [filename,timestamp]
    
    #new_row = {'file_name': filename, 'upload_timestamp':timestamp}
    #df_all = df.append(new_row)
    
    df.to_csv('write_log.csv', index=False)
    return df

In [98]:
file_path = 'write_log.csv'
log_key = 'write_log.csv'
bucket_name_trg = 'etl-p2-storage'
s3_client = boto3.client('s3')

def update_s3_log(s3, file_path, bucket, key):
    s3_client.upload_file(file_path, bucket, key)

In [99]:
update_s3_log(s3, file_path, bucket_name_trg, log_key)

In [42]:
def extract(bucket, objects):
    files = [key for key in list_of_files(objects)]
    # print(files)
    df = pd.concat([read_csv_to_df(bucket, obj) for obj in files], ignore_index=True)
    return df

def transformations(df):
    
    df.dropna(inplace=True)
    df['date'] = pd.to_datetime(df['date'])
    df = df.groupby([ 'symbol', df['date'].dt.year], as_index=False).agg(
                                                              opening_price=('open', 'min'), 
                                                              closing_price=('close', 'min'), 
                                                              minimum_price=('low', 'min'),
                                                              maximum_price=('high','max'),
                                                              daily_traded_volume=('volume','sum'),
                                                              avg_opening_price=('open', 'mean'), 
                                                              avg_closing_price=('close', 'mean'), 
                                                              avg_minimum_price=('low', 'mean'),
                                                              avg_maximum_price=('high','mean'),
                                                              avg_daily_traded_volume=('volume','mean')
                                                                                )
    df['$_change_closing_price'] = df['closing_price'] - df['opening_price']
    df['%_change_closing_price'] = (df['$_change_closing_price']/df['closing_price'])*100
    df = df.round(decimals=2)
    
    df['Year'] = ''
    
    df.loc[df.index[range(0,len(df),4)], 'Year'] = "2014"
    df.loc[df.index[range(1,len(df),4)], 'Year'] = "2015"
    df.loc[df.index[range(2,len(df),4)], 'Year'] = "2016"
    df.loc[df.index[range(3,len(df),4)], 'Year'] = "2017"

    cols = list(df.columns.values)
    cols.insert(1, cols.pop(cols.index('Year')))
    df = df.loc[:, cols]
    
    return df

def load(s3, bucket_trg, df):
    file_name = 'stock_data_cleansed_' + dt.today().strftime("%Y%m%d_%H:%M:%S") + '.csv'
    write_df_to_s3(s3, bucket_trg, df, file_name)
    

def etl_report(s3, bucket, bucket_trg, objects):
    df = extract(bucket, objects)
    df = transformations(df)
    load(s3, bucket_trg, df)
    log_update()

In [43]:
def main():
    src_bucket = 'etl-p2-data'
    trg_bucket = 'etl-p2-storage'
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(src_bucket)
    objects = [obj for obj in bucket.objects.all()]
    
    test_report = etl_report(s3, bucket, trg_bucket, objects)

In [44]:
main()