# E-commerce Customer Behavior Prediction and Recommendation Engine
## Notebook 1: Data Acquisition and Integration

This notebook focuses on the critical first step: establishing a robust data acquisition and integration pipeline that will form the foundation for all subsequent analyses and modeling.


## Environment Setup

Before diving into data acquisition, let's set up our environment with the necessary libraries and configurations:


In [1]:
# Import necessary libraries
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
import logging
import json
import hashlib
from pathlib import Path
import time
from typing import Dict, List, Tuple, Optional, Union, Any
import glob
import re
from dotenv import load_dotenv
import zipfile
import shutil
from tqdm.notebook import tqdm

# Determine the project root.
# If running in a Jupyter notebook, assume the current working directory is 'notebooks'
PROJECT_ROOT = Path(__file__).resolve().parents[1] if '__file__' in globals() else Path.cwd().parent

# Load environment variables from the .env file located in the project root
load_dotenv(PROJECT_ROOT / ".env")

# Configure warnings
warnings.filterwarnings('ignore')

# Ensure the logs directory exists (located at PROJECT_ROOT/logs)
log_dir = PROJECT_ROOT / "logs"
log_dir.mkdir(parents=True, exist_ok=True)

# Set up logging to track our data acquisition process.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_dir / "data_acquisition.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("DataAcquisition")

# Set display options for better readability of dataframes
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', 100)

# Set random seed for reproducibility across runs
np.random.seed(42)

## Project Directory Structure

Let's create a well-organized project structure to maintain clean separation between raw data, processed data, models, and outputs. This follows data science best practices and will make our project more maintainable.


In [2]:
# Create project directory structure
def create_project_structure(base_dir: Path) -> dict:
    """
    Creates a standardized project directory structure for organizing data and outputs.
    
    Args:
        base_dir: The base directory for the project (as a Path object)
        
    Returns:
        A dictionary mapping directory names to their paths (as Path objects)
    """
    directories = {
        "raw_data": base_dir / "data" / "raw",           # Original, immutable data
        "processed_data": base_dir / "data" / "processed", # Cleaned and transformed data
        "interim_data": base_dir / "data" / "interim",     # Temporary data between processing steps
        "models": base_dir / "models",                     # Trained models and model artifacts
        "outputs": base_dir / "outputs",                   # Analysis outputs and visualizations
        "logs": base_dir / "logs",                         # Logs from various processes
        "reports": base_dir / "reports",                   # Generated analysis reports
        "notebooks": base_dir / "notebooks",               # Jupyter notebooks
        "configs": base_dir / "configs"                    # Configuration files
    }
    
    # Create directories if they don't exist
    for name, dir_path in directories.items():
        dir_path.mkdir(parents=True, exist_ok=True)
        logger.info(f"Created directory: {dir_path}")
    
    return directories

# Use the previously defined PROJECT_ROOT from the earlier code block
project_dirs = create_project_structure(PROJECT_ROOT)

# Display the project structure
# print("Project Directory Structure Created:")
# for name, path in project_dirs.items():
    # print(f"- {name}: {path}")

2025-03-26 12:34:20,356 - DataAcquisition - INFO - Created directory: C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\data\raw
2025-03-26 12:34:20,358 - DataAcquisition - INFO - Created directory: C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\data\processed
2025-03-26 12:34:20,359 - DataAcquisition - INFO - Created directory: C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\data\interim
2025-03-26 12:34:20,361 - DataAcquisition - INFO - Created directory: C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\models
2025-03-26 12:34:20,363 - DataAcquisition - INFO - Created directory: C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\outputs
2025-03-26 12:34:20,364 - DataAcquisition - INFO - Created directory: C:\_Arash\github\ecom-reco-predic

## Dataset Configuration

For this project, we'll be working with four complementary datasets that together provide a comprehensive view of customer behavior:

| Dataset | Description | Size | Key Features |
|---------|-------------|------|--------------|
| **E-commerce User Behavior** | Tracks product views, cart additions, and purchases | ~14M events, ~70K users | user_id, product_id, event_type, price |
| **Amazon Product Reviews** | Product reviews with ratings and metadata | Millions of reviews | product_id, customer_id, review_text, star_rating |
| **Online Retail II** | Transactional data from UK-based retailer | ~1M transactions, ~4K customers | InvoiceNo, StockCode, Quantity, Price, CustomerID |
| **Customer Personality** | Customer demographics and behaviors | ~2K customers | Age, Education, Income, Purchase history |

Let's define the configuration for each dataset:

In [3]:
# Define dataset configurations
datasets_config = {
    "ecommerce_behavior": {
        "name": "E-commerce User Behavior Dataset",
        "local_path": f"{project_dirs['raw_data']}/ecommerce_behavior",
        "file_pattern": "*.csv",
        "description": "User behavior data tracking product views, cart additions, and purchases",
        "source": "https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store",
        "size_estimate": "~14 million events from ~70,000 users across 12 months",
        "key_features": ["user_id", "product_id", "category_id", "event_type", "event_time", "price", "brand"]
    },
    "amazon_reviews": {
        "name": "Amazon Product Reviews Dataset",
        "local_path": f"{project_dirs['raw_data']}/amazon_reviews",
        "file_pattern": "*.tsv",
        "description": "Comprehensive product reviews with ratings and product metadata",
        "source": "https://www.kaggle.com/datasets/cynthiarempel/amazon-us-customer-reviews-dataset",
        "size_estimate": "Millions of reviews across multiple product categories",
        "key_features": ["product_id", "customer_id", "review_text", "star_rating", "helpful_votes", "product_category"]
    },
    "online_retail": {
        "name": "Online Retail II Dataset",
        "local_path": f"{project_dirs['raw_data']}/online_retail",
        "file_pattern": "*.xlsx",
        "description": "Transactional data from a UK-based online retailer",
        "source": "https://www.kaggle.com/datasets/mashlyn/online-retail-ii-uci",
        "size_estimate": "~1 million transactions from ~4,000 customers over 2 years",
        "key_features": ["InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "UnitPrice", "CustomerID", "Country"]
    },
    "customer_personality": {
        "name": "Customer Personality Analysis Dataset",
        "local_path": f"{project_dirs['raw_data']}/customer_personality",
        "file_pattern": "*.csv",
        "description": "Detailed customer demographics and purchasing behaviors",
        "source": "https://www.kaggle.com/datasets/imakash3011/customer-personality-analysis",
        "size_estimate": "~2,000 customers with 30+ demographic and behavioral features",
        "key_features": ["Age", "Education", "Income", "Marital_Status", "Purchase history", "Campaign responses"]
    }
}

# Save dataset configuration to JSON for future reference
with open(f"{project_dirs['configs']}/datasets_config.json", 'w') as f:
    json.dump(datasets_config, f, indent=4)
    
logger.info(f"Saved dataset configuration to {project_dirs['configs']}/datasets_config.json")

2025-03-26 12:34:28,090 - DataAcquisition - INFO - Saved dataset configuration to C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\configs/datasets_config.json


## Data Integration Architecture

The diagram below outlines our data integration approach. We'll combine the four datasets through entity resolution and feature engineering to create a unified customer view that powers our recommendation and prediction models.


In [4]:
Data Sources → Preprocessing → Entity Resolution → Feature Engineering → Unified Data Layer → Models

SyntaxError: invalid character '→' (U+2192) (868503627.py, line 1)

For this project, we'll build a pipeline that integrates:
- Customer demographics and profile information
- Transaction history and purchase patterns
- Product interactions (views, carts, purchases)
- Product metadata and review sentiment

## Data Loader Implementation

Now, let's implement a robust `DataLoader` class that will handle loading, validation, and caching of our datasets. This class implements several important features for production-grade data processing:

- **Configurable sampling**: For development and testing on smaller data subsets
- **Data caching**: To speed up repeated processing
- **Error handling**: Robust error detection and logging
- **Data validation**: Basic checks to ensure data integrity
- **Support for multiple file formats**: CSV, TSV, Excel, and Parquet

In [5]:
class DataLoader:
    """
    A class to handle loading, basic preprocessing, and validation of datasets.
    
    Features:
    - Configurable sampling for working with manageable data sizes
    - Data caching to disk for faster reloads
    - Support for multiple file formats (CSV, TSV, Excel)
    - Robust error handling and logging
    - File integrity validation
    """
    
    def __init__(self, datasets_config: Dict[str, Dict[str, Any]], sample_size: Optional[int] = None, cache_enabled: bool = True):
        """
        Initialize the DataLoader.
        
        Args:
            datasets_config: Configuration dictionary for all datasets
            sample_size: If provided, load only a sample of this size from each dataset
            cache_enabled: Whether to use and create cache files for faster loading
        """
        self.datasets_config = datasets_config
        self.sample_size = sample_size
        self.cache_enabled = cache_enabled
        self.data_cache = {}  # Memory cache for loaded datasets
        
    def compute_file_hash(self, file_path: str) -> str:
        """
        Compute a hash of a file to track changes and ensure data integrity.
        
        Args:
            file_path: Path to the file
            
        Returns:
            Hash string for the file
        """
        if not os.path.exists(file_path):
            return ""
            
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
        
    def load_csv(self, file_path: str, **kwargs) -> pd.DataFrame:
        """
        Load a CSV file with proper error handling.
        
        Args:
            file_path: Path to the CSV file
            **kwargs: Additional arguments to pass to pd.read_csv
            
        Returns:
            Pandas DataFrame containing the data
        """
        try:
            logger.info(f"Loading CSV file: {file_path}")
            
            # Check for TSV file to handle Amazon reviews dataset
            if file_path.endswith('.tsv'):
                df = pd.read_csv(file_path, sep='\t', **kwargs)
            else:
                df = pd.read_csv(file_path, **kwargs)
            
            # Apply sampling if requested
            if self.sample_size is not None and len(df) > self.sample_size:
                df = df.sample(n=self.sample_size, random_state=42)
                logger.info(f"Sampled {self.sample_size:,} rows from {file_path}")
            
            rows, cols = df.shape
            logger.info(f"Successfully loaded {file_path}: {rows:,} rows, {cols} columns")
            return df
            
        except Exception as e:
            logger.error(f"Error loading CSV file {file_path}: {str(e)}")
            raise
    
    def load_excel(self, file_path: str, **kwargs) -> pd.DataFrame:
        """
        Load an Excel file with proper error handling.
        
        Args:
            file_path: Path to the Excel file
            **kwargs: Additional arguments to pass to pd.read_excel
            
        Returns:
            Pandas DataFrame containing the data
        """
        try:
            logger.info(f"Loading Excel file: {file_path}")
            
            df = pd.read_excel(file_path, **kwargs)
            
            # Apply sampling if requested
            if self.sample_size is not None and len(df) > self.sample_size:
                df = df.sample(n=self.sample_size, random_state=42)
                logger.info(f"Sampled {self.sample_size:,} rows from {file_path}")
            
            rows, cols = df.shape
            logger.info(f"Successfully loaded {file_path}: {rows:,} rows, {cols} columns")
            return df
            
        except Exception as e:
            logger.error(f"Error loading Excel file {file_path}: {str(e)}")
            raise
    
    def load_parquet(self, file_path: str, **kwargs) -> pd.DataFrame:
        """
        Load a Parquet file with proper error handling.
        
        Args:
            file_path: Path to the Parquet file
            **kwargs: Additional arguments to pass to pd.read_parquet
            
        Returns:
            Pandas DataFrame containing the data
        """
        try:
            logger.info(f"Loading Parquet file: {file_path}")
            
            df = pd.read_parquet(file_path, **kwargs)
            
            # Apply sampling if requested (unlikely needed for cached parquet, but included for consistency)
            if self.sample_size is not None and len(df) > self.sample_size:
                df = df.sample(n=self.sample_size, random_state=42)
                logger.info(f"Sampled {self.sample_size:,} rows from {file_path}")
            
            rows, cols = df.shape
            logger.info(f"Successfully loaded {file_path}: {rows:,} rows, {cols} columns")
            return df
            
        except Exception as e:
            logger.error(f"Error loading Parquet file {file_path}: {str(e)}")
            raise
    
    def validate_file_exists(self, file_path: str) -> bool:
        """
        Check if a file exists and log appropriate messages.
        
        Args:
            file_path: Path to the file
            
        Returns:
            Boolean indicating if the file exists
        """
        if os.path.exists(file_path):
            file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
            logger.info(f"Found file: {file_path} ({file_size_mb:.2f} MB)")
            return True
        else:
            logger.warning(f"File not found: {file_path}")
            return False
    
    def get_dataset_files(self, dataset_id: str) -> List[str]:
        """
        Get all files matching the pattern for a dataset.
        
        Args:
            dataset_id: ID of the dataset
            
        Returns:
            List of file paths
        """
        if dataset_id not in self.datasets_config:
            logger.error(f"Unknown dataset ID: {dataset_id}")
            return []
        
        config = self.datasets_config[dataset_id]
        pattern = os.path.join(config["local_path"], config["file_pattern"])
        files = glob.glob(pattern)
        
        if not files:
            logger.warning(f"No files found matching pattern {pattern} for dataset {dataset_id}")
        
        return files
    
    def check_cached_version(self, dataset_id: str, file_path: str) -> Optional[str]:
        """
        Check if there's a cached version of a file and if it's still valid.
        
        Args:
            dataset_id: ID of the dataset
            file_path: Path to the original file
            
        Returns:
            Path to the cached file if it exists and is valid, None otherwise
        """
        if not self.cache_enabled:
            return None
            
        cache_dir = f"{project_dirs['interim_data']}/{dataset_id}_cache"
        base_name = os.path.basename(file_path)
        cache_path = f"{cache_dir}/{os.path.splitext(base_name)[0]}.parquet"
        
        # Check if cache exists
        if os.path.exists(cache_path):
            # Compare modification times
            orig_mtime = os.path.getmtime(file_path)
            cache_mtime = os.path.getmtime(cache_path)
            
            if cache_mtime > orig_mtime:
                logger.info(f"Found valid cache for {file_path} at {cache_path}")
                return cache_path
            else:
                logger.info(f"Cache exists but is outdated: {cache_path}")
        
        return None
    
    def save_to_cache(self, df: pd.DataFrame, dataset_id: str, file_path: str) -> str:
        """
        Save a DataFrame to cache for faster future loading.
        
        Args:
            df: DataFrame to cache
            dataset_id: ID of the dataset
            file_path: Path to the original file
            
        Returns:
            Path to the cached file
        """
        if not self.cache_enabled:
            return ""
            
        cache_dir = f"{project_dirs['interim_data']}/{dataset_id}_cache"
        os.makedirs(cache_dir, exist_ok=True)
        
        base_name = os.path.basename(file_path)
        cache_path = f"{cache_dir}/{os.path.splitext(base_name)[0]}.parquet"
        
        logger.info(f"Saving {len(df):,} rows to cache: {cache_path}")
        df.to_parquet(cache_path, index=False)
        
        return cache_path
    
    def load_file(self, dataset_id: str, file_path: str) -> pd.DataFrame:
        """
        Load a single file, using cache if available.
        
        Args:
            dataset_id: ID of the dataset
            file_path: Path to the file
            
        Returns:
            DataFrame containing the data
        """
        # Check if file exists
        if not self.validate_file_exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Check for cached version
        cached_path = self.check_cached_version(dataset_id, file_path)
        if cached_path:
            return self.load_parquet(cached_path)
        
        # Load based on file extension
        if file_path.endswith('.csv') or file_path.endswith('.tsv'):
            df = self.load_csv(file_path)
        elif file_path.endswith('.xlsx') or file_path.endswith('.xls'):
            df = self.load_excel(file_path)
        elif file_path.endswith('.parquet'):
            df = self.load_parquet(file_path)
        else:
            raise ValueError(f"Unsupported file format: {file_path}")
        
        # Save to cache for future use
        if self.cache_enabled:
            self.save_to_cache(df, dataset_id, file_path)
        
        return df
    
    def validate_dataframe(self, df: pd.DataFrame, dataset_id: str) -> bool:
        """
        Perform basic validation on a dataframe to ensure data integrity.
        
        Args:
            df: DataFrame to validate
            dataset_id: ID of the dataset for context
            
        Returns:
            Boolean indicating if validation passed
        """
        # Check if dataframe is empty
        if df.empty:
            logger.error(f"DataFrame for {dataset_id} is empty")
            return False
        
        # Check for expected columns based on dataset config
        expected_features = self.datasets_config[dataset_id]["key_features"]
        missing_features = [f for f in expected_features if f not in df.columns and f.lower() not in df.columns]
        
        if missing_features:
            logger.warning(f"Missing expected features in {dataset_id}: {missing_features}")
            # We don't fail validation for this, just log a warning
        
        # Check for completely empty columns
        empty_cols = [col for col in df.columns if df[col].isna().all()]
        if empty_cols:
            logger.warning(f"Found completely empty columns in {dataset_id}: {empty_cols}")
        
        # Basic check for percentage of missing values
        missing_percentage = (df.isna().sum() / len(df) * 100).max()
        if missing_percentage > 50:
            logger.warning(f"{dataset_id} has columns with more than 50% missing values (max: {missing_percentage:.2f}%)")
        
        return True
    
    def load_dataset(self, dataset_id: str) -> Dict[str, pd.DataFrame]:
        """
        Load all files for a specified dataset.
        
        Args:
            dataset_id: ID of the dataset to load
            
        Returns:
            Dictionary mapping file names to DataFrames
        """
        if dataset_id not in self.datasets_config:
            logger.error(f"Unknown dataset ID: {dataset_id}")
            return {}
            
        config = self.datasets_config[dataset_id]
        logger.info(f"Loading dataset: {config['name']}")
        
        # Get all files for this dataset
        files = self.get_dataset_files(dataset_id)
        if not files:
            logger.error(f"No files found for dataset: {dataset_id}")
            return {}
        
        result = {}
        for file_path in files:
            try:
                df = self.load_file(dataset_id, file_path)
                
                # Validate the dataframe
                if self.validate_dataframe(df, dataset_id):
                    file_name = os.path.basename(file_path)
                    result[file_name] = df
                else:
                    logger.error(f"Validation failed for {file_path}")
            except Exception as e:
                logger.error(f"Error processing {file_path}: {str(e)}")
                continue
                
        if not result:
            logger.error(f"Failed to load any files for dataset: {config['name']}")
            
        return result
    
    def load_all_datasets(self) -> Dict[str, Dict[str, pd.DataFrame]]:
        """
        Load all datasets specified in the configuration.
        
        Returns:
            Dictionary mapping dataset IDs to dictionaries mapping file names to DataFrames
        """
        result = {}
        
        for dataset_id in self.datasets_config:
            dataset_result = self.load_dataset(dataset_id)
            if dataset_result:
                result[dataset_id] = dataset_result
                
        return result

## Data Load Execution

Now that we've defined our `DataLoader` class, let's use it to load our datasets. We'll start with a smaller sample size for development purposes, to ensure our code works correctly before scaling up to the full datasets.


In [6]:
# Initialize the data loader with a sample size for development
sample_size = 1000  # Adjust based on your system's memory constraints
loader = DataLoader(datasets_config, sample_size=sample_size, cache_enabled=True)

# Load all datasets
print("\n" + "="*80)
print("LOADING DATASETS")
print("="*80)

try:
    # This assumes you've already downloaded the datasets to the specified paths
    # In a production environment, you'd integrate with a data downloading system or API
    all_datasets = loader.load_all_datasets()
    
    # Print summary of loaded datasets
    print("\nDataset Loading Summary:")
    for dataset_id, files_dict in all_datasets.items():
        print(f"\n{datasets_config[dataset_id]['name']}:")
        if not files_dict:
            print(f"  ❌ No files loaded")
        else:
            for file_name, df in files_dict.items():
                print(f"  ✅ {file_name}: {len(df):,} rows, {len(df.columns)} columns")
                
                # Print first few column names as a preview
                print(f"     Columns: {', '.join(df.columns[:5])}...")
                
except Exception as e:
    print(f"\n❌ Error loading datasets: {str(e)}")

2025-03-26 12:34:48,742 - DataAcquisition - INFO - Loading dataset: E-commerce User Behavior Dataset
2025-03-26 12:34:48,744 - DataAcquisition - ERROR - No files found for dataset: ecommerce_behavior
2025-03-26 12:34:48,745 - DataAcquisition - INFO - Loading dataset: Amazon Product Reviews Dataset
2025-03-26 12:34:48,748 - DataAcquisition - ERROR - No files found for dataset: amazon_reviews
2025-03-26 12:34:48,749 - DataAcquisition - INFO - Loading dataset: Online Retail II Dataset
2025-03-26 12:34:48,752 - DataAcquisition - ERROR - No files found for dataset: online_retail
2025-03-26 12:34:48,753 - DataAcquisition - INFO - Loading dataset: Customer Personality Analysis Dataset
2025-03-26 12:34:48,757 - DataAcquisition - ERROR - No files found for dataset: customer_personality



LOADING DATASETS

Dataset Loading Summary:


## Basic Data Exploration

Let's explore each dataset to understand its structure and contents. This will help us identify data cleaning needs and integration points.


In [7]:
def explore_dataset(dataset_id: str, files_dict: Dict[str, pd.DataFrame]) -> None:
    """
    Perform basic exploration of a dataset.
    
    Args:
        dataset_id: ID of the dataset
        files_dict: Dictionary mapping file names to DataFrames
    """
    if not files_dict:
        print(f"No data available for {dataset_id}")
        return
    
    config = datasets_config[dataset_id]
    print(f"\n{'='*80}")
    print(f"EXPLORING {config['name'].upper()}")
    print(f"{'='*80}")
    
    for file_name, df in files_dict.items():
        print(f"\nFile: {file_name}")
        
        # 1. Basic information
        print(f"\nShape: {df.shape[0]:,} rows, {df.shape[1]} columns")
        
        # 2. Data types
        print("\nData Types:")
        for dtype, count in df.dtypes.value_counts().items():
            print(f"  - {dtype}: {count} columns")
        
        # 3. Missing values
        missing = df.isna().sum()
        missing_cols = missing[missing > 0]
        print("\nMissing Values:")
        if len(missing_cols) == 0:
            print("  - No missing values")
        else:
            for col, count in missing_cols.items():
                percentage = (count / len(df)) * 100
                print(f"  - {col}: {count:,} missing values ({percentage:.2f}%)")
        
        # 4. Sample data
        print("\nSample Data:")
        display(df.head(3))
        
        # 5. Dataset-specific exploration
        if dataset_id == "ecommerce_behavior":
            # Check event types distribution
            if 'event_type' in df.columns:
                print("\nEvent Type Distribution:")
                event_counts = df['event_type'].value_counts()
                for event, count in event_counts.items():
                    percentage = (count / len(df)) * 100
                    print(f"  - {event}: {count:,} events ({percentage:.2f}%)")
            
            # Check top categories
            if 'category_id' in df.columns:
                print("\nTop 5 Categories:")
                cat_counts = df['category_id'].value_counts().head(5)
                for cat, count in cat_counts.items():
                    percentage = (count / len(df)) * 100
                    print(f"  - {cat}: {count:,} events ({percentage:.2f}%)")
                    
        elif dataset_id == "amazon_reviews":
            # Check rating distribution
            if 'star_rating' in df.columns:
                print("\nRating Distribution:")
                rating_counts = df['star_rating'].value_counts().sort_index()
                for rating, count in rating_counts.items():
                    percentage = (count / len(df)) * 100
                    print(f"  - {rating} stars: {count:,} reviews ({percentage:.2f}%)")
            
            # Check product categories
            if 'product_category' in df.columns:
                print("\nTop 5 Product Categories:")
                cat_counts = df['product_category'].value_counts().head(5)
                for cat, count in cat_counts.items():
                    percentage = (count / len(df)) * 100
                    print(f"  - {cat}: {count:,} reviews ({percentage:.2f}%)")
                    
        elif dataset_id == "online_retail":
            # Check top countries
            if 'Country' in df.columns:
                print("\nTop 5 Countries:")
                country_counts = df['Country'].value_counts().head(5)
                for country, count in country_counts.items():
                    percentage = (count / len(df)) * 100
                    print(f"  - {country}: {count:,} transactions ({percentage:.2f}%)")
            
            # Check transaction distribution over time
            if 'InvoiceDate' in df.columns:
                print("\nTransaction Timeline:")
                df['InvoiceMonth'] = df['InvoiceDate'].dt.to_period('M')
                month_counts = df['InvoiceMonth'].value_counts().sort_index()
                for month, count in month_counts.head(5).items():
                    print(f"  - {month}: {count:,} transactions")
                    
        elif dataset_id == "customer_personality":
            # Check demographics
            if 'Education' in df.columns:
                print("\nEducation Distribution:")
                edu_counts = df['Education'].value_counts()
                for edu, count in edu_counts.items():
                    percentage = (count / len(df)) * 100
                    print(f"  - {edu}: {count:,} customers ({percentage:.2f}%)")
            
            # Check age distribution
            if 'Year_Birth' in df.columns:
                print("\nAge Distribution:")
                current_year = datetime.now().year
                df['Age'] = current_year - df['Year_Birth']
                print(f"  - Age range: {df['Age'].min()} to {df['Age'].max()} years")
                print(f"  - Average age: {df['Age'].mean():.1f} years")
                print(f"  - Median age: {df['Age'].median():.1f} years")
        
        # 6. Memory usage
        memory_usage = df.memory_usage(deep=True).sum() / (1024 * 1024)  # in MB
        print(f"\nMemory Usage: {memory_usage:.2f} MB")

# Explore each dataset
for dataset_id, files_dict in all_datasets.items():
    explore_dataset(dataset_id, files_dict)

## Initial Data Quality Assessment

Based on our exploration, let's identify key data quality issues that we'll need to address in the data cleaning notebook. This will help us plan our data preprocessing strategy.

In [8]:
def assess_data_quality(all_datasets: Dict[str, Dict[str, pd.DataFrame]]) -> Dict[str, List[str]]:
    """
    Assess data quality issues in all datasets.
    
    Args:
        all_datasets: Dictionary of all loaded datasets
        
    Returns:
        Dictionary mapping dataset IDs to lists of data quality issues
    """
    quality_issues = {}
    
    for dataset_id, files_dict in all_datasets.items():
        issues = []
        
        # Skip if no files were loaded
        if not files_dict:
            quality_issues[dataset_id] = ["No files loaded"]
            continue
        
        # Take the first DataFrame for assessment
        df = next(iter(files_dict.values()))
        
        # 1. Check for missing values
        missing_cols = df.columns[df.isna().mean() > 0.05]
        if len(missing_cols) > 0:
            issues.append(f"High missing values (>5%) in columns: {', '.join(missing_cols)}")
        
        # 2. Check for duplicates
        try:
            dup_count = df.duplicated().sum()
            dup_pct = (dup_count / len(df)) * 100
            if dup_pct > 0.1:
                issues.append(f"Contains {dup_count:,} duplicated rows ({dup_pct:.2f}%)")
        except:
            issues.append("Could not check for duplicates (possibly due to non-hashable types)")
        
        # 3. Dataset-specific checks
        if dataset_id == "ecommerce_behavior":
            # Check for invalid event types
            if 'event_type' in df.columns:
                valid_events = ['view', 'cart', 'purchase', 'remove_from_cart']
                invalid_events = df['event_type'].unique().tolist()
                invalid_events = [e for e in invalid_events if e not in valid_events]
                if invalid_events:
                    issues.append(f"Contains invalid event types: {invalid_events}")
            
            # Check for reasonable price ranges
            if 'price' in df.columns:
                if df['price'].min() < 0:
                    issues.append("Contains negative prices")
                if df['price'].max() > 100000:
                    issues.append("Contains suspiciously high prices (>$100,000)")
        
        elif dataset_id == "amazon_reviews":
            # Check for invalid ratings
            if 'star_rating' in df.columns:
                invalid_ratings = df['star_rating'][(df['star_rating'] < 1) | (df['star_rating'] > 5)].count()
                if invalid_ratings > 0:
                    issues.append(f"Contains {invalid_ratings:,} invalid ratings (not 1-5)")
        
        elif dataset_id == "online_retail":
            # Check for negative quantities
            if 'Quantity' in df.columns:
                neg_qty = (df['Quantity'] < 0).sum()
                if neg_qty > 0:
                    neg_pct = (neg_qty / len(df)) * 100
                    issues.append(f"Contains {neg_qty:,} negative quantities ({neg_pct:.2f}%)")
            
            # Check for negative prices
            if 'UnitPrice' in df.columns:
                neg_price = (df['UnitPrice'] < 0).sum()
                if neg_price > 0:
                    issues.append(f"Contains {neg_price:,} negative unit prices")
        
        elif dataset_id == "customer_personality":
            # Check for invalid age values
            if 'Year_Birth' in df.columns:
                current_year = datetime.now().year
                invalid_age = ((current_year - df['Year_Birth']) > 100).sum()
                if invalid_age > 0:
                    issues.append(f"Contains {invalid_age:,} customers with age > 100 years")
        
        # If no issues were found, add a positive note
        if not issues:
            issues.append("No major data quality issues detected")
        
        quality_issues[dataset_id] = issues
    
    return quality_issues

# Assess data quality
print("\n" + "="*80)
print("DATA QUALITY ASSESSMENT")
print("="*80)

quality_issues = assess_data_quality(all_datasets)

for dataset_id, issues in quality_issues.items():
    print(f"\n{datasets_config[dataset_id]['name']}:")
    for issue in issues:
        issue_symbol = "❌" if "No major" not in issue else "✅"
        print(f"  {issue_symbol} {issue}")


DATA QUALITY ASSESSMENT


## Data Integration Strategy

Let's briefly outline our strategy for integrating these datasets in the next notebook:

In [9]:
print("\n" + "="*80)
print("DATA INTEGRATION STRATEGY")
print("="*80)

print("""
Our data integration approach will focus on:

1. **Customer Entity Resolution**
   * Create unified customer profiles by matching identifiers across datasets
   * Use demographic information where available to enhance matching

2. **Product Entity Resolution**
   * Map products between E-commerce Behavior and Amazon Reviews datasets
   * Use product attributes and categories for matching

3. **Key Integration Points**
   * User behavior → Customer profile linkage
   * Product interactions → Product review sentiment
   * Purchase history → Customer value metrics
   * Demographics → Behavioral segmentation

This integrated data will form the foundation for our recommendation models, 
churn prediction, and customer lifetime value analysis.
""")


DATA INTEGRATION STRATEGY

Our data integration approach will focus on:

1. **Customer Entity Resolution**
   * Create unified customer profiles by matching identifiers across datasets
   * Use demographic information where available to enhance matching

2. **Product Entity Resolution**
   * Map products between E-commerce Behavior and Amazon Reviews datasets
   * Use product attributes and categories for matching

3. **Key Integration Points**
   * User behavior → Customer profile linkage
   * Product interactions → Product review sentiment
   * Purchase history → Customer value metrics
   * Demographics → Behavioral segmentation

This integrated data will form the foundation for our recommendation models, 
churn prediction, and customer lifetime value analysis.



## Summary and Next Steps

Let's summarize what we've accomplished in this notebook and outline the next steps in our project:

In [10]:
def print_summary():
    """
    Print a summary of the notebook and next steps.
    """
    print("\n" + "="*80)
    print("SUMMARY AND NEXT STEPS")
    print("="*80)
    
    # Count total rows across all datasets
    total_rows = 0
    loaded_files = 0
    
    for dataset_id, files_dict in all_datasets.items():
        for file_name, df in files_dict.items():
            total_rows += len(df)
            loaded_files += 1
    
    print(f"""
## What We've Accomplished

In this notebook, we've:

1. 📂 Created a structured project organization with {len(project_dirs)} directories
2. 📊 Configured {len(datasets_config)} datasets containing valuable customer behavior data
3. 🔄 Implemented a robust data loading framework with caching and validation
4. 📋 Loaded {loaded_files} data files containing {total_rows:,} total rows
5. 🔍 Explored the key characteristics of each dataset
6. 🚩 Identified data quality issues to address in preprocessing
7. 🗺️ Outlined a strategy for integrating these datasets

## Next Steps

1. **Notebook 2: Data Cleaning and Preprocessing**
   * Address the identified data quality issues
   * Handle missing values and outliers
   * Standardize formats and units across datasets
   * Create a clean, consistent foundation for analysis

2. **Notebook 3: Exploratory Data Analysis**
   * Discover patterns in customer behavior
   * Identify correlations between features
   * Generate insights to inform model development
   * Visualize key relationships in the data

3. **Notebook 4: Feature Engineering**
   * Create new features that capture behavior patterns
   * Develop time-based features to capture trends
   * Implement the integration strategy outlined above
   * Prepare features specifically for recommendation models
    """)

# Print summary
print_summary()


SUMMARY AND NEXT STEPS

## What We've Accomplished

In this notebook, we've:

1. 📂 Created a structured project organization with 9 directories
2. 📊 Configured 4 datasets containing valuable customer behavior data
3. 🔄 Implemented a robust data loading framework with caching and validation
4. 📋 Loaded 0 data files containing 0 total rows
5. 🔍 Explored the key characteristics of each dataset
6. 🚩 Identified data quality issues to address in preprocessing
7. 🗺️ Outlined a strategy for integrating these datasets

## Next Steps

1. **Notebook 2: Data Cleaning and Preprocessing**
   * Address the identified data quality issues
   * Handle missing values and outliers
   * Standardize formats and units across datasets
   * Create a clean, consistent foundation for analysis

2. **Notebook 3: Exploratory Data Analysis**
   * Discover patterns in customer behavior
   * Identify correlations between features
   * Generate insights to inform model development
   * Visualize key relationships in t

## Environment Information

For reproducibility, let's save information about the environment where this notebook was executed:

In [11]:
def save_environment_info():
    """
    Save information about the execution environment for reproducibility.
    """
    import platform
    import sys
    
    env_info = {
        "python_version": platform.python_version(),
        "platform": platform.platform(),
        "pandas_version": pd.__version__,
        "numpy_version": np.__version__,
        "execution_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    
    # Save to JSON
    with open(f"{project_dirs['logs']}/environment_info.json", 'w') as f:
        json.dump(env_info, f, indent=4)
    
    print("\nEnvironment Information:")
    for key, value in env_info.items():
        print(f"  - {key}: {value}")
    
    print(f"\nEnvironment information saved to {project_dirs['logs']}/environment_info.json")

# Save environment information
save_environment_info()


Environment Information:
  - python_version: 3.11.5
  - platform: Windows-10-10.0.26100-SP0
  - pandas_version: 2.1.1
  - numpy_version: 1.26.0
  - execution_date: 2025-03-26 12:35:10

Environment information saved to C:\_Arash\github\ecom-reco-predictor\full_recom_prediction_engine\ecommerce_recommendation_project\logs/environment_info.json


This completes our Data Acquisition and Integration notebook. We've set up a robust foundation for our E-commerce Customer Behavior Prediction and Recommendation Engine project. In the next notebook, we'll focus on cleaning and preprocessing the data to address the quality issues we've identified.