In [34]:
import pandas as pd
import numpy as np
from datetime import datetime
import os
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')


class EcommerceETL:
    def __init__(self, raw_data_path, processed_data_path):
        """Initialize ETL pipeline with data paths."""
        self.raw_data_path = raw_data_path
        self.processed_data_path = processed_data_path

    def load_event_chunks(self):
        """Load and concatenate event chunks."""
        chunk_path = os.path.join(self.raw_data_path, "events_chunks")
        all_chunks = []
        
        for file in os.listdir(chunk_path):
            if file.endswith(".csv"):
                chunk = pd.read_csv(os.path.join(chunk_path, file))
                all_chunks.append(chunk)
        
        if not all_chunks:
            raise FileNotFoundError("No event chunk files found.")
        
        # Concatenate all chunks
        self.events = pd.concat(all_chunks, ignore_index=True)
        print(f"Loaded {len(self.events)} rows of event data from chunks.")

    def re_chunk_events(self, num_chunks=4):
        os.makedirs(self.processed_data_path, exist_ok=True)
        chunk_size = len(self.events) // num_chunks
        for i in range(num_chunks):
            start_idx = i * chunk_size
            end_idx = len(self.events) if i == num_chunks - 1 else (i + 1) * chunk_size
            chunk = self.events.iloc[start_idx:end_idx]
            chunk.to_csv(
                os.path.join(self.processed_data_path, f"temp_events_1_{i + 1}.csv"),
                index=False,
            )
        logging.info(f"Re-chunked events data into {num_chunks} files.")

        
    def load_raw_data(self):
        """Load all raw CSV files into dataframes."""
        self.distribution_centers = pd.read_csv(f"{self.raw_data_path}/distribution_centers.csv")
        self.load_event_chunks()
        self.inventory_items = pd.read_csv(f"{self.raw_data_path}/inventory_items.csv")
        self.order_items = pd.read_csv(f"{self.raw_data_path}/order_items.csv")
        self.orders = pd.read_csv(f"{self.raw_data_path}/orders.csv")
        self.products = pd.read_csv(f"{self.raw_data_path}/products.csv")
        self.users = pd.read_csv(f"{self.raw_data_path}/users.csv")
        
    def clean_timestamps(self, df, timestamp_columns):
        """Convert timestamp strings to datetime objects and handle invalid dates."""
        for col in timestamp_columns:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce')
        return df
    
    def clean_numeric_values(self, df, numeric_columns):
        """Clean numeric columns: handle negatives, nulls, and invalid values."""
        for col in numeric_columns:
            if col in df.columns:
                # Replace negative values with NaN
                df.loc[df[col] < 0, col] = np.nan
                # Fill NaN with median for numeric columns
                df[col] = df[col].fillna(df[col].median())
        return df
    
    def clean_categorical_values(self, df, categorical_columns):
        """Clean categorical columns: standardize case, remove extra spaces."""
        for col in categorical_columns:
            if col in df.columns:
                df[col] = df[col].astype(str).str.strip().str.lower()
                df[col] = df[col].replace('nan', np.nan)
                df[col] = df[col].fillna('unknown')
        return df
    
    def clean_distribution_centers(self):
        """Clean distribution centers data."""
        df = self.distribution_centers.copy()
        numeric_cols = ['latitude', 'longitude']
        categorical_cols = ['name']
        
        df = self.clean_numeric_values(df, numeric_cols)
        df = self.clean_categorical_values(df, categorical_cols)
        
        return df
    
    def clean_events(self):
        """Clean events data."""
        df = self.events.copy()
        timestamp_cols = ["created_at"]
        categorical_cols = [
            "city", "state", "browser", "traffic_source", "uri", "event_type"
        ]

        # Clean timestamps
        for col in timestamp_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors="coerce")

        # Clean categorical values
        for col in categorical_cols:
            if col in df.columns:
                df[col] = df[col].astype(str).str.strip().str.lower()
                df[col] = df[col].replace("nan", np.nan)
                df[col] = df[col].fillna("unknown")

        # Clean IP addresses
        if "ip_address" in df.columns:
            df["ip_address"] = df["ip_address"].fillna("0.0.0.0")

        self.events = df
        print("Cleaned events data.")

    
    
    def clean_inventory_items(self):
        """Clean inventory items data."""
        df = self.inventory_items.copy()
        timestamp_cols = ['created_at', 'sold_at']
        numeric_cols = ['cost', 'product_retail_price']
        categorical_cols = ['product_category', 'product_name', 'product_brand', 
                          'product_department', 'product_sku']
        
        df = self.clean_timestamps(df, timestamp_cols)
        df = self.clean_numeric_values(df, numeric_cols)
        df = self.clean_categorical_values(df, categorical_cols)
        
        return df
    
    def clean_orders_and_items(self):
        """Clean orders and order items data."""
        orders_df = self.orders.copy()
        items_df = self.order_items.copy()
        
        timestamp_cols = ['created_at', 'shipped_at', 'delivered_at', 'returned_at']
        categorical_cols = ['status', 'gender']
        numeric_cols = ['num_of_item']
        
        # Clean orders
        orders_df = self.clean_timestamps(orders_df, timestamp_cols)
        orders_df = self.clean_categorical_values(orders_df, categorical_cols)
        orders_df = self.clean_numeric_values(orders_df, numeric_cols)
        
        # Clean order items
        items_df = self.clean_timestamps(items_df, timestamp_cols)
        items_df = self.clean_categorical_values(items_df, ['status'])
        
        return orders_df, items_df
    
    def clean_products(self):
        """Clean products data."""
        df = self.products.copy()
        numeric_cols = ['cost', 'retail_price']
        categorical_cols = ['category', 'name', 'brand', 'department', 'sku']
        
        df = self.clean_numeric_values(df, numeric_cols)
        df = self.clean_categorical_values(df, categorical_cols)
        
        return df
    
    def clean_users(self):
        """Clean users data."""
        df = self.users.copy()
        timestamp_cols = ['created_at']
        numeric_cols = ['age', 'latitude', 'longitude']
        categorical_cols = ['first_name', 'last_name', 'gender', 'state', 
                          'city', 'country', 'traffic_source']
        
        df = self.clean_timestamps(df, timestamp_cols)
        df = self.clean_numeric_values(df, numeric_cols)
        df = self.clean_categorical_values(df, categorical_cols)
        
        # Clean email addresses
        df['email'] = df['email'].str.lower().fillna('unknown@example.com')
        
        # Clean age data
        df.loc[df['age'] > 100, 'age'] = df['age'].median()
        df.loc[df['age'] < 18, 'age'] = df['age'].median()
        
        return df
    def process_all_data(self):
        """Process all datasets and save to processed directory."""
        self.load_raw_data()

    # Load and clean event chunks
        self.load_event_chunks()
        self.clean_events()

    # Save cleaned event data
        self.events.to_csv(f"{self.processed_data_path}/cleaned_events.csv", index=False)

    # Clean other datasets
        cleaned_dcs = self.clean_distribution_centers()
        cleaned_inventory = self.clean_inventory_items()
        cleaned_orders, cleaned_order_items = self.clean_orders_and_items()
        cleaned_products = self.clean_products()
        cleaned_users = self.clean_users()

    # Save cleaned datasets
        cleaned_dcs.to_csv(f"{self.processed_data_path}/cleaned_distribution_centers.csv", index=False)
        cleaned_inventory.to_csv(f"{self.processed_data_path}/cleaned_inventory_items.csv", index=False)
        cleaned_orders.to_csv(f"{self.processed_data_path}/cleaned_orders.csv", index=False)
        cleaned_order_items.to_csv(f"{self.processed_data_path}/cleaned_order_items.csv", index=False)
        cleaned_products.to_csv(f"{self.processed_data_path}/cleaned_products.csv", index=False)
        cleaned_users.to_csv(f"{self.processed_data_path}/cleaned_users.csv", index=False)

        print("All data has been cleaned and saved to the processed directory.")

    


In [35]:
# Initialize and run the ETL pipeline
etl = EcommerceETL(
    raw_data_path='../../data/raw',
    processed_data_path='../../data/processed'
)
etl.process_all_data()

Loaded 2431963 rows of event data from chunks.
Loaded 2431963 rows of event data from chunks.
Cleaned events data.


  return np.nanmean(a, axis, out=out, keepdims=keepdims)


All data has been cleaned and saved to the processed directory.


In [36]:
import pandas as pd
import os
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

def re_chunk_cleaned_events(input_path, output_path, num_chunks=4):
    """Re-chunk cleaned events data into smaller files."""
    os.makedirs(output_path, exist_ok=True)
    
    # Load the cleaned events data
    cleaned_events = pd.read_csv(input_path)
    chunk_size = len(cleaned_events) // num_chunks
    
    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = len(cleaned_events) if i == num_chunks - 1 else (i + 1) * chunk_size
        chunk = cleaned_events.iloc[start_idx:end_idx]
        
        chunk_filename = f"cleaned_events_chunk_{i + 1}.csv"
        chunk.to_csv(os.path.join(output_path, chunk_filename), index=False)
        
    logging.info(f"Re-chunked cleaned events data into {num_chunks} files.")

input_path = '../../data/processed/cleaned_events.csv'
output_path = '../../data/cleaned_events_chunks'
re_chunk_cleaned_events(input_path, output_path)


2025-01-12 14:48:57,765 - Re-chunked cleaned events data into 4 files.
