In [109]:
from datetime import datetime

In [110]:
"""
    Main objectives are as follows:
        1. Read from the staging database
        2. Clean this data
        3. Send cleaned data into the production database
"""
import boto3
import psycopg2
import pandas as pd


AWS_ENDPOINT = 'sigma-data-engineering-instance-1.c1i5dspnearp.eu-west-2.rds.amazonaws.com'
AWS_PORT = '5432'
AWS_USERNAME = 'alex'
AWS_PASSWORD = 'sigmastudent'
AWS_DATABASE = 'postgres'

AWS_STAGING_SCHEMA = 'week4_alex_staging'
AWS_STAGING_TABLE = 'staging_ecommerce'

AWS_PRODUCTION_SCHEMA = 'week4_alex_production'

DATAFRAME_COLUMNS = ["order_number", "toothbrush_type", "order_date", "customer_age",
    "order_quantity", "delivery_postcode", "billing_postcode", "is_first",
    "dispatch_status", "dispatched_date", "delivery_status", "delivery_date"
]

def connection(host=AWS_ENDPOINT, port=AWS_PORT, database=AWS_DATABASE, user=AWS_USERNAME, password=AWS_PASSWORD):
    return psycopg2.connect(
        host = host,
        port = port,
        database = database,
        user = user,
        password = password,
    )

def execute_query(rds_connection, schema_name=AWS_STAGING_SCHEMA, table_name=AWS_STAGING_TABLE):
    cur = rds_connection.cursor()
    cur.execute(f"""
        SELECT * FROM {schema_name}.{table_name}
    """)
    return cur.fetchall()

def lambda_handler(event, context):
    try:
        """Read from staging database"""
        conn = connection()
        if conn is not None:
            print(f"Connected to {AWS_ENDPOINT}")

        results = execute_query(conn)

        if results is not None:
            print("Successfully retrieved table")
        else:
            print("Empty!")

        """Convert result into Dataframe, to be cleaned"""
        """Bit of a cheat but declare column names myself"""

        df = pd.DataFrame(results)
        df.columns = DATAFRAME_COLUMNS

    except Exception as e:
        print(f"Database could not connect: {e}")

    return df

event, context = {}, {}
df = lambda_handler(event, context)

Connected to sigma-data-engineering-instance-1.c1i5dspnearp.eu-west-2.rds.amazonaws.com
Successfully retrieved table


# 3199 / 9503 rows do not have empty values for delivery status and delivery date 

# Convert order_date, dispatched_date, delivery_date to datetime

In [111]:
def columns_to_datetime(dataframe):
    datetime_columns = [col for col in dataframe.columns if 'date' in col]
    for column in datetime_columns:
        dataframe[column] = pd.to_datetime(dataframe[column], errors = 'coerce')
    return f"Columns {datetime_columns} now in datetime"

In [112]:
columns_to_datetime(df)

"Columns ['order_date', 'dispatched_date', 'delivery_date'] now in datetime"

# Postcodes have weird format

In [113]:
"""First capitalise every letter in the postcode"""
def postcode_correction(dataframe):
    postcode_columns = ["delivery_postcode", "billing_postcode"]
    for postcodes in postcode_columns:
        #dataframe[postcodes] = map(lambda x: str(x).upper(), dataframe[postcodes])
        dataframe[postcodes] = dataframe[postcodes].apply(lambda x : x.upper())
    return "Postcodes capitalised"

In [114]:
postcode_correction(df)

'Postcodes capitalised'

In [115]:
df.sample(20)

Unnamed: 0,order_number,toothbrush_type,order_date,customer_age,order_quantity,delivery_postcode,billing_postcode,is_first,dispatch_status,dispatched_date,delivery_status,delivery_date
6088,BRU00007785,Toothbrush 2000,2023-01-10 09:02:32.208740000,97,3,SW1P 3NY,SW1P 3NY,1,Dispatched,2023-01-10 16:37:17.196573200,In Transit,2023-01-10 17:32:38.955323600
7070,BRU00002347,Toothbrush 2000,2023-01-10 14:54:09.129818800,70,7,TQ1 3UR,TQ1 3UR,1,Dispatched,2023-01-10 21:47:30.230875600,In Transit,2023-01-10 22:56:06.504118000
508,BRU00008324,Toothbrush 4000,2023-01-10 09:59:43.291314400,17,2,CF363BH,SN103PU,1,Dispatched,2023-01-10 17:02:28.333595200,In Transit,2023-01-10 18:09:39.735106000
1987,BRU00004927,Toothbrush 2000,2023-01-10 10:53:14.476497200,72,6,LA95HX,LA95HX,1,Dispatched,2023-01-10 18:06:50.634992000,In Transit,2023-01-10 19:02:16.501454000
2954,BRU00000476,Toothbrush 4000,2023-01-10 21:46:59.592415200,45,1,E50SG,E50SG,1,Order Received,2023-01-10 21:59:24.953760000,,NaT
414,BRU00006089,Toothbrush 4000,2023-01-10 20:39:07.668354800,34,5,HA80LN,S123LA,1,Order Received,2023-01-10 20:51:27.728318000,,NaT
4473,BRU00001447,Toothbrush 2000,2023-01-10 06:04:21.185487600,83,2,SG5 2UD,SG5 2UD,1,Dispatched,2023-01-10 13:21:25.302701999,In Transit,2023-01-10 14:28:01.334119199
354,BRU00007041,Toothbrush 2000,2023-01-10 14:17:53.836086400,65,4,CF31%201PJ,EX7 9SB,1,Dispatched,2023-01-10 20:36:55.095097600,In Transit,2023-01-10 21:47:58.586336800
1239,BRU00007583,Toothbrush 4000,2023-01-10 22:34:42.790596400,32,4,GL40JR,GL40JR,1,Order Confirmed,2023-01-10 23:39:51.938206000,,NaT
5622,BRU00001294,Toothbrush 4000,2023-01-10 19:58:02.522524400,16,7,PL29 3SA,PL29 3SA,1,Dispatched,2023-01-11 02:54:06.443574800,Unsuccessful,2023-01-12 02:07:06.742196000


# Is the 'is_first' column redundant?

In [116]:
df["is_first"].loc[lambda x : x != 1]

Series([], Name: is_first, dtype: int64)

In [117]:
df = df.drop('is_first', axis = 1)

# What to do about the null values?

In [118]:
df[["delivery_status", "delivery_date"]].sample(20)

Unnamed: 0,delivery_status,delivery_date
1404,,NaT
4871,,NaT
6867,,NaT
4703,Unsuccessful,2023-01-12 01:10:36.387405200
9168,,NaT
1298,In Transit,2023-01-10 23:28:29.518675600
807,,NaT
3921,In Transit,2023-01-10 19:16:20.423275600
6874,,NaT
9348,,NaT


In [119]:
df["delivery_date"].unique()

array([                          'NaT', '2023-01-12T18:28:02.151865200',
       '2023-01-10T22:04:02.207794399', ...,
       '2023-01-10T20:08:40.763735200', '2023-01-10T18:21:09.514488799',
       '2023-01-10T22:29:40.772631600'], dtype='datetime64[ns]')

In [120]:
df["delivery_status"] = df["delivery_status"].fillna("In Process")

In [123]:
def create_future_date():
    return datetime.strptime('Dec 31 2099 12:00AM', '%b %d %Y %I:%M%p')

In [124]:
future_date = create_future_date()

In [125]:
print(future_date)

2099-12-31 00:00:00


In [126]:
df["delivery_date"] = df["delivery_date"].fillna(future_date)

In [131]:
df["order_date"].where(df["order_date"] > datetime.now())

0      NaT
1      NaT
2      NaT
3      NaT
4      NaT
        ..
9498   NaT
9499   NaT
9500   NaT
9501   NaT
9502   NaT
Name: order_date, Length: 9503, dtype: datetime64[ns]

In [134]:
df.sample(20)

Unnamed: 0,order_number,toothbrush_type,order_date,customer_age,order_quantity,delivery_postcode,billing_postcode,dispatch_status,dispatched_date,delivery_status,delivery_date
6032,BRU00007044,Toothbrush 2000,2023-01-10 09:56:43.227123200,72,9,NG10 3DT,NG10 3DT,Dispatched,2023-01-10 17:23:35.921841200,In Transit,2023-01-10 18:14:51.482070800
9224,BRU00006531,Toothbrush 2000,2023-01-10 14:57:59.558045600,86,9,YO26 5SE,YO26 5SE,Dispatched,2023-01-10 22:39:19.712439200,In Transit,2023-01-10 23:33:18.635812400
1074,BRU00003974,Toothbrush 4000,2023-01-10 12:31:53.801258800,35,3,CM195PQ,CM195PQ,Dispatched,2023-01-10 19:31:32.354676400,In Transit,2023-01-10 20:29:35.119726000
4266,BRU00006974,Toothbrush 4000,2023-01-10 22:09:01.660452400,33,1,PE19 1JS,PE19 1JS,Order Received,2023-01-10 22:21:09.473896000,In Process,2099-12-31 00:00:00.000000000
1802,BRU00008604,Toothbrush 4000,2023-01-10 07:32:39.969700800,20,2,CF634QR,CF634QR,Order Confirmed,2023-01-10 09:00:33.127783200,In Process,2099-12-31 00:00:00.000000000
2940,BRU00000902,Toothbrush 2000,2023-01-10 05:04:58.915326400,87,4,G432XH,G432XH,Dispatched,2023-01-10 12:40:40.248304000,In Transit,2023-01-10 13:57:29.387218000
6458,BRU00002238,Toothbrush 4000,2023-01-11 00:54:57.656576800,22,1,AL1 2RE,AL1 2RE,Dispatched,2023-01-11 07:05:53.340578800,In Transit,2023-01-11 08:19:26.493088000
399,BRU00008656,Toothbrush 2000,2023-01-10 15:14:17.461297600,74,8,PO1%201AU,SW15 2RA,Order Confirmed,2023-01-10 16:31:38.705443600,In Process,2099-12-31 00:00:00.000000000
8067,BRU00007081,Toothbrush 4000,2023-01-10 11:51:55.886851200,0,6,NN16 9QP,NN16 9QP,Order Received,2023-01-10 12:05:29.143940400,In Process,2099-12-31 00:00:00.000000000
8134,BRU00006682,Toothbrush 2000,2023-01-10 09:50:36.933876800,71,7,BD5 8JP,BD5 8JP,Order Confirmed,2023-01-10 11:00:52.325595200,In Process,2099-12-31 00:00:00.000000000
