In [126]:
from google.cloud import storage

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import re
import os
import io
#import glob
#from google.cloud import bigquery

In [127]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/Users/harshsingh/Documents/Loblaw-case-study/key.json"

In [None]:
bucket_name = 'loblaw-bucket'
prefix = 'raw'
transformed_folder_name = 'processed/transformed'  
transformed_file_name = 'sales-processed.parquet'  
feature_folder_name = 'processed/feature' 
feature_file_name = 'sales-feature.parquet' 

In [129]:
client = storage.Client()
bucket = client.get_bucket(bucket_name)

In [130]:

def raw_data():
    blobs = bucket.list_blobs(prefix=prefix)

    csv_files = [blob.name for blob in blobs if blob.name.endswith('.csv')]
    # Loading all CSVs into a DataFrame
    df_list = []
    for file_name in csv_files:
    
        blob = bucket.blob(file_name)
        file_content = blob.download_as_text()
    
        df = pd.read_csv(io.StringIO(file_content))
        df_list.append(df)
    
    raw_df = pd.concat(df_list, ignore_index=True)
    
    return raw_df


In [None]:
def upload_to_blob_as_parquet(df, folder_name, file_name ):
    bucket = client.bucket(bucket_name)
    # Full object path in the GCS bucket
    object_path = f'{folder_name}/{file_name}'
    blob = bucket.blob(object_path)
    
    # Convert the DataFrame to a Parquet file (in-memory)
    buffer = io.BytesIO()
    df.to_parquet(buffer, engine='pyarrow')
    
    # Reset buffer
    buffer.seek(0)

    # test_df = pd.read_parquet(buffer)
    #print(f"Locally converted DataFrame has {len(test_df)} rows")
    
    blob.upload_from_file(buffer)


In [233]:
def download_blob(folder_name, file_name):
    object_path = f'{folder_name}/{file_name}'
    blob = bucket.blob(object_path)
    
    buffer = io.BytesIO()
    
    # Download blob data into the buffer
    blob.download_to_file(buffer)
    
    # Reset buffer position to the beginning
    buffer.seek(0)
    
    # Read Parquet data from buffer to DataFrame
    df = pd.read_parquet(buffer)

    return df

In [133]:
def clean_sales_data(df):
   df = df.drop_duplicates()
   df = df.dropna(how='all')

   for col in df.select_dtypes(include=['object']).columns:
       df[col] = df[col].fillna('')
       
   for col in df.select_dtypes(include=['float64', 'int64']).columns:
       df[col] = df[col].fillna(0)

   if 'Order ID' in df.columns:
       df = df.rename(columns={
           'Order ID': 'order_id', 
           'Product': 'product', 
           'Quantity Ordered': 'quantity', 
           'Price Each': 'each_price', 
           'Order Date': 'order_date', 
           'Purchase Address': 'purchased_address'
       })

   
   if 'order_date' in df.columns:
       df['order_date'] = df['order_date'].astype(str).str.strip() 
       df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
       df['month'] = np.where(df['order_date'].isna(), 0, df['order_date'].dt.month).astype(int)
       df['day_of_week'] = df['order_date'].dt.day_name()
   
   df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(0).astype(int)
   df['each_price'] = pd.to_numeric(df['each_price'], errors='coerce').fillna(0).astype(float)
   df['total_price'] = df['quantity'] * df['each_price']

    # creating product id
   product_mapping = {product: idx+1 for idx, product in enumerate(df['product'].unique())}
   df['product_id'] = df['product'].map(product_mapping)
   
   address_col = 'purchased_address' if 'purchased_address' in df.columns else 'Purchase Address'
   
   if address_col in df.columns:
       df[address_col] = df[address_col].fillna('')
       
       df['street'] = ''
       df['city'] = ''
       df['state'] = ''
       df['zip'] = ''
       
       for i, row in df.iterrows():
           addr = row[address_col]
           if addr and isinstance(addr, str):
               parts = [part.strip() for part in addr.split(',')]
               
               if len(parts) >= 3:
                   df.at[i, 'street'] = parts[0]
                   df.at[i, 'city'] = parts[1]
                   
                   state_zip = parts[2].strip()
                   match = re.search(r'([A-Z]{2})\s+(\d{5})', state_zip)
                   
                   if match:                       
                       df.at[i, 'state'] = match.group(1)
                       df.at[i, 'zip'] = match.group(2)
   
   return df

In [170]:
drop_cols = ['purchased_address','order_id','order_date','zip','street','city']
def feature_extraction(df):

    
    df['quarter'] = df['order_date'].dt.quarter
    df['dayofmonth'] = df['order_date'].dt.day
    df['weekofyear'] = df['order_date'].dt.isocalendar().week
    

    df = df.sort_values('order_date')
    df['prev_day_qty'] = df['quantity'].shift(1)
    df['prev_week_qty'] = df['quantity'].shift(7)
    df['prev_month_qty'] = df['quantity'].shift(30)

    df['rolling_7d_avg'] = df['quantity'].rolling(window=7).mean()
    df['rolling_30d_avg'] = df['quantity'].rolling(window=30).mean()

    df = df.dropna(ignore_index= True)
    
    df = df.drop(drop_cols, axis=1)

    return df


In [140]:
df.head()

Unnamed: 0,order_id,product,quantity,each_price,order_date,purchased_address,year,month,day,total_price,street,city,state,zip
0,176558,USB-C Charging Cable,2,11.95,2019-04-19 08:46:00,"917 1st St, Dallas, TX 75001",2019,4,19,23.9,917 1st St,Dallas,TX,75001
2,176559,Bose SoundSport Headphones,1,99.99,2019-04-07 22:30:00,"682 Chestnut St, Boston, MA 02215",2019,4,7,99.99,682 Chestnut St,Boston,MA,2215
3,176560,Google Phone,1,600.0,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",2019,4,12,600.0,669 Spruce St,Los Angeles,CA,90001
4,176560,Wired Headphones,1,11.99,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",2019,4,12,11.99,669 Spruce St,Los Angeles,CA,90001
5,176561,Wired Headphones,1,11.99,2019-04-30 09:27:00,"333 8th St, Los Angeles, CA 90001",2019,4,30,11.99,333 8th St,Los Angeles,CA,90001


In [142]:
raw_df = raw_data()
print(raw_df.head(10))
raw_df.shape

  Order ID                     Product Quantity Ordered Price Each  \
0   176558        USB-C Charging Cable                2      11.95   
1      NaN                         NaN              NaN        NaN   
2   176559  Bose SoundSport Headphones                1      99.99   
3   176560                Google Phone                1        600   
4   176560            Wired Headphones                1      11.99   
5   176561            Wired Headphones                1      11.99   
6   176562        USB-C Charging Cable                1      11.95   
7   176563  Bose SoundSport Headphones                1      99.99   
8   176564        USB-C Charging Cable                1      11.95   
9   176565          Macbook Pro Laptop                1       1700   

       Order Date                        Purchase Address  
0  04/19/19 08:46            917 1st St, Dallas, TX 75001  
1             NaN                                     NaN  
2  04/07/19 22:30       682 Chestnut St, Boston, 

(186850, 6)

In [143]:

cured_df = clean_sales_data(raw_df)
cured_df.head()

  df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')


Unnamed: 0,order_id,product,quantity,each_price,order_date,purchased_address,month,day_of_week,total_price,product_id,street,city,state,zip
0,176558,USB-C Charging Cable,2,11.95,2019-04-19 08:46:00,"917 1st St, Dallas, TX 75001",4,Friday,23.9,1,917 1st St,Dallas,TX,75001
2,176559,Bose SoundSport Headphones,1,99.99,2019-04-07 22:30:00,"682 Chestnut St, Boston, MA 02215",4,Sunday,99.99,2,682 Chestnut St,Boston,MA,2215
3,176560,Google Phone,1,600.0,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",4,Friday,600.0,3,669 Spruce St,Los Angeles,CA,90001
4,176560,Wired Headphones,1,11.99,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",4,Friday,11.99,4,669 Spruce St,Los Angeles,CA,90001
5,176561,Wired Headphones,1,11.99,2019-04-30 09:27:00,"333 8th St, Los Angeles, CA 90001",4,Tuesday,11.99,4,333 8th St,Los Angeles,CA,90001


In [157]:
upload_to_blob_as_parquet(cured_df, transformed_folder_name, transformed_file_name)

In [158]:
cured_df.head()

Unnamed: 0,order_id,product,quantity,each_price,order_date,purchased_address,month,day_of_week,total_price,product_id,street,city,state,zip
0,176558,USB-C Charging Cable,2,11.95,2019-04-19 08:46:00,"917 1st St, Dallas, TX 75001",4,Friday,23.9,1,917 1st St,Dallas,TX,75001
2,176559,Bose SoundSport Headphones,1,99.99,2019-04-07 22:30:00,"682 Chestnut St, Boston, MA 02215",4,Sunday,99.99,2,682 Chestnut St,Boston,MA,2215
3,176560,Google Phone,1,600.0,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",4,Friday,600.0,3,669 Spruce St,Los Angeles,CA,90001
4,176560,Wired Headphones,1,11.99,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",4,Friday,11.99,4,669 Spruce St,Los Angeles,CA,90001
5,176561,Wired Headphones,1,11.99,2019-04-30 09:27:00,"333 8th St, Los Angeles, CA 90001",4,Tuesday,11.99,4,333 8th St,Los Angeles,CA,90001


In [171]:
featured_df = feature_extraction(cured_df)
featured_df.head()

Unnamed: 0,product,quantity,each_price,month,day_of_week,total_price,product_id,state,quarter,dayofmonth,weekofyear,prev_day_qty,prev_week_qty,prev_month_qty,rolling_7d_avg,rolling_30d_avg
0,AAA Batteries (4-pack),2,2.99,1,Tuesday,5.98,10,CA,1.0,1.0,1,1.0,1.0,1.0,1.428571,1.266667
1,Lightning Charging Cable,2,14.95,1,Tuesday,29.9,6,CA,1.0,1.0,1,2.0,1.0,1.0,1.571429,1.3
2,Google Phone,1,600.0,1,Tuesday,600.0,3,NY,1.0,1.0,1,2.0,3.0,1.0,1.285714,1.3
3,iPhone,1,700.0,1,Tuesday,700.0,11,GA,1.0,1.0,1,1.0,1.0,1.0,1.285714,1.3
4,USB-C Charging Cable,1,11.95,1,Tuesday,11.95,1,OR,1.0,1.0,1,1.0,1.0,2.0,1.285714,1.266667


In [172]:

upload_to_blob_as_parquet(featured_df, feature_folder_name, feature_file_name)

In [164]:
df = download_blob(transformed_folder_name,transformed_file_name)
df.head()

Unnamed: 0,order_id,product,quantity,each_price,order_date,purchased_address,month,day_of_week,total_price,product_id,street,city,state,zip
0,176558,USB-C Charging Cable,2,11.95,2019-04-19 08:46:00,"917 1st St, Dallas, TX 75001",4,Friday,23.9,1,917 1st St,Dallas,TX,75001
2,176559,Bose SoundSport Headphones,1,99.99,2019-04-07 22:30:00,"682 Chestnut St, Boston, MA 02215",4,Sunday,99.99,2,682 Chestnut St,Boston,MA,2215
3,176560,Google Phone,1,600.0,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",4,Friday,600.0,3,669 Spruce St,Los Angeles,CA,90001
4,176560,Wired Headphones,1,11.99,2019-04-12 14:38:00,"669 Spruce St, Los Angeles, CA 90001",4,Friday,11.99,4,669 Spruce St,Los Angeles,CA,90001
5,176561,Wired Headphones,1,11.99,2019-04-30 09:27:00,"333 8th St, Los Angeles, CA 90001",4,Tuesday,11.99,4,333 8th St,Los Angeles,CA,90001


In [165]:
df = download_blob(feature_folder_name,feature_file_name)
df.head()

Unnamed: 0,product,quantity,each_price,month,day_of_week,total_price,product_id,state
0,USB-C Charging Cable,2,11.95,4,Friday,23.9,1,TX
2,Bose SoundSport Headphones,1,99.99,4,Sunday,99.99,2,MA
3,Google Phone,1,600.0,4,Friday,600.0,3,CA
4,Wired Headphones,1,11.99,4,Friday,11.99,4,CA
5,Wired Headphones,1,11.99,4,Tuesday,11.99,4,CA
