In [56]:
import pandas as pd
import os
from sqlalchemy import create_engine

# PostgreSQL connection URL (to be updated)
db_connection_string = "################################"

# Create an extract function that reads and merges the two datasets 
def extract(extra_data, store_data, db_connection_string):
    extra_df = pd.read_parquet(extra_data)
    
    engine = create_engine(db_connection_string)
    with engine.connect() as connection:
        query = "SELECT * FROM grocery_sales"
        store_data = pd.read_sql(query, connection)
    
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

# Call the extract function
merged_df = extract("extra_data.parquet", grocery_sales, db_connection_string)

In [57]:
# Check head of the merged dataset
merged_df.head()

Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,1,2010-02-05,1,24924.5,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,1,2010-02-05,26,11737.12,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,1,2010-02-05,17,13223.76,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0


In [58]:
# Check data types
merged_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 231522 entries, 0 to 231521
Data columns (total 17 columns):
 #   Column        Non-Null Count   Dtype         
---  ------        --------------   -----         
 0   index         231522 non-null  int64         
 1   Store_ID      231522 non-null  int64         
 2   Date          231483 non-null  datetime64[ns]
 3   Dept          231522 non-null  int64         
 4   Weekly_Sales  231484 non-null  float64       
 5   IsHoliday     231522 non-null  int64         
 6   Temperature   231522 non-null  float64       
 7   Fuel_Price    231522 non-null  float64       
 8   MarkDown1     231522 non-null  float64       
 9   MarkDown2     231522 non-null  float64       
 10  MarkDown3     231522 non-null  float64       
 11  MarkDown4     231521 non-null  float64       
 12  MarkDown5     231521 non-null  float64       
 13  CPI           231475 non-null  float64       
 14  Unemployment  231485 non-null  float64       
 15  Type          231

In [59]:
# Check missing values
merged_df.isna().sum()

index            0
Store_ID         0
Date            39
Dept             0
Weekly_Sales    38
IsHoliday        0
Temperature      0
Fuel_Price       0
MarkDown1        0
MarkDown2        0
MarkDown3        0
MarkDown4        1
MarkDown5        1
CPI             47
Unemployment    37
Type             1
Size             1
dtype: int64

In [60]:
# Check if missing values in 'Date' column are randomly distributed
merged_df[merged_df['Date'].isnull()].index

Int64Index([  16,   17,   18,   19,   20,   76,   77,   78,   79,   80,   81,
              82,  658,  659,  660,  661,  662,  663,  664,  665,  666, 2548,
            2549, 2550, 2551, 2552, 2553, 2554, 2555, 2556, 3916, 3917, 3918,
            3919, 3920, 3921, 3922, 3923, 3924],
           dtype='int64')

In [61]:
# Create a transform function that replaces missing values and subsets the dataframe
def transform(raw_data):
    raw_data['Date'] = raw_data['Date'].fillna(method='ffill')
    raw_data.fillna(
        {
            'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
            'CPI': raw_data['CPI'].mean(),
            'Unemployment': raw_data['Unemployment'].mean()
        }, inplace = True
    )
    raw_data['Month'] = raw_data['Date'].dt.month
    raw_data = raw_data.loc[
        raw_data['Weekly_Sales'] > 10000,
        ['Store_ID', 'Dept', 'Month', 'Weekly_Sales', 'IsHoliday', 'CPI', 'Unemployment']
    ]
    return raw_data

# Call the transform function on the merged dataset
clean_data = transform(merged_df)

In [62]:
# Check the head of the transformed dataframe
clean_data.head()

Unnamed: 0,Store_ID,Dept,Month,Weekly_Sales,IsHoliday,CPI,Unemployment
0,1,1,2,24924.5,0,211.096358,8.106
1,1,26,2,11737.12,0,211.096358,8.106
2,1,17,2,13223.76,0,211.096358,8.106
5,1,79,2,46729.77,0,211.096358,7.500052
6,1,55,2,21249.31,0,211.096358,7.500052


In [63]:
# Create a function that calculates average weekly sales per month
def avg_weekly_sales_per_month(clean_data):
    clean_data_sales = clean_data[['Month', 'Weekly_Sales']]
    clean_data_sales = clean_data_sales.groupby(by='Month').agg(Avg_Sales=('Weekly_Sales', 'mean')).round(2)
    clean_data_sales = clean_data_sales.reset_index()
    return clean_data_sales

# Call the avg_weekly_sales_per_month function on the transformed dataframe
agg_data = avg_weekly_sales_per_month(clean_data)

In [64]:
# Create a load that saves dataframes into specified paths 
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    full_data.to_csv(full_data_file_path, index=False)
    agg_data.to_csv(agg_data_file_path, index=False)
    
# Call the load function on the transformed and and monthly sales dataframes    
load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')

In [65]:
# Create a validation function that checks if the dataframes were saved on the working directory
def validation(file_path):
    file_exists = os.path.exists(file_path)
    if not file_exists:
        raise Exception(f"There is no file at path: {file_path}")
        
# Call the validation function
validation('clean_data.csv')
validation('agg_data.csv')