In [1]:
from pymongo import MongoClient
import pandas as pd
import os
from dotenv import load_dotenv
from urllib.parse import quote_plus
import urllib.parse
# Load environment variables from .env file
load_dotenv()


True

In [2]:
# Get MongoDB connection details from environment variables
username = os.getenv("MONGODB_USERNAME")
password = os.getenv("MONGODB_PASSWORD")
cluster = os.getenv("MONGODB_CLUSTER")
database = os.getenv("MONGODB_DATABASE")
escaped_username = urllib.parse.quote_plus(username)
escaped_password = urllib.parse.quote_plus(password)

In [None]:
# Import necessary libraries
import pandas as pd
from pymongo import MongoClient, errors # Import errors for specific exception handling
import os # Optional: Good practice for handling paths

# --- Configuration ---
# 1. MongoDB Connection Details (Replace with your actual details)
#    Get this from MongoDB Atlas (Connect -> Drivers) or your local setup
#    Example format: "mongodb+srv://<username>:<password>@your_cluster_address/?retryWrites=true&w=majority"
MONGO_CONNECTION_STRING = "mongodb+srv://retail:retail123@retailcluster.ipxcovl.mongodb.net/"
DATABASE_NAME = "detaildb"  # Replace with your database name (e.g., "retail_db")
COLLECTION_NAME = "retail_app_data" # Replace with your collection name (e.g., "sales_data")

# 2. CSV File Path (Corrected Formatting)
#    Using a raw string (r"...") is recommended on Windows to handle backslashes correctly.
csv_file_path = r"D:\DATA AND AI\ML WORKSHOP\Session 2 Data Processing-20250316T042803Z-001\Session 2 Data Processing\Data and code\retail_app_data.csv"
#    Alternatively, you could use forward slashes "/":
# csv_file_path = "D:/DATA AND AI/ML WORKSHOP/Session 2 Data Processing-20250316T042803Z-001/Session 2 Data Processing/Data and code/retail_app_data.csv"


# --- Main Script Logic ---
client = None # Initialize client to None to ensure it can be used in the finally block
try:
    # --- Establish MongoDB Connection ---
    print(f"Attempting to connect to MongoDB...")
    # Set a server selection timeout to handle connection errors faster (e.g., 5 seconds)
    client = MongoClient(MONGO_CONNECTION_STRING, serverSelectionTimeoutMS=5000)

    # The ismaster command is cheap and does not require auth. It forces a connection check.
    client.admin.command('ismaster')
    print("MongoDB connection successful!")

    # Select the database
    db = client[DATABASE_NAME]
    print(f"Selected database: '{DATABASE_NAME}'")

    # Select the collection
    collection = db[COLLECTION_NAME]
    print(f"Selected collection: '{COLLECTION_NAME}'")

    # --- Read CSV File ---
    print(f"Reading CSV file from: {csv_file_path}")
    # Check if the file exists before attempting to read
    if not os.path.exists(csv_file_path):
        raise FileNotFoundError(f"Error: CSV file not found at the specified path: {csv_file_path}")

    df = pd.read_csv(csv_file_path)
    print(f"Successfully read {len(df)} rows from the CSV file.")

    # --- Prepare Data for MongoDB ---
    # Convert DataFrame to a list of dictionaries (each dictionary is a MongoDB document)
    # Ensure CSV column names are valid for MongoDB keys (e.g., no '.' or '$' at the start)
    if df.empty:
        print("CSV file is empty. No records to insert.")
        records = []
    else:
        # Handle potential NaN values which are not directly supported by BSON/MongoDB
        # Replace pandas NaN with None, which MongoDB handles correctly
        df_filled = df.fillna(value=float('nan')).replace({float('nan'): None})
        records = df_filled.to_dict("records")
        print(f"Converted DataFrame to {len(records)} records (NaN values replaced with None).")

    # --- Insert Data into MongoDB ---
    if records: # Only proceed if there are records to insert
        print(f"Inserting {len(records)} records into MongoDB collection '{COLLECTION_NAME}'...")
        # insert_many inserts multiple documents from the list
        result = collection.insert_many(records)
        print(f"Successfully inserted {len(result.inserted_ids)} documents.")

        # --- Verify Insertion Count ---
        # Count total documents in the collection after insertion
        total_docs = collection.count_documents({})
        print(f"Collection '{COLLECTION_NAME}' now contains {total_docs} documents.")
    else:
        print("Skipping insertion as there are no records.")

# --- Error Handling ---
except FileNotFoundError as fnf_error:
    # Handle the specific case where the CSV file isn't found
    print(f"File Error: {fnf_error}")
except errors.ConfigurationError as config_error:
    # Handle errors related to MongoDB connection string format or options
     print(f"MongoDB Configuration Error: {config_error}")
except errors.ConnectionFailure as conn_error:
    # Handle errors failing to connect to MongoDB server
    print(f"MongoDB Connection Error: Could not connect to the server. Check connection string, network access, and server status. Details: {conn_error}")
except Exception as e:
    # Catch any other unexpected errors during the process
    print(f"An unexpected error occurred: {e}")
    # Consider logging the full traceback here for debugging complex issues
    # import traceback
    # print(traceback.format_exc())
finally:
    # Ensure the MongoDB connection is closed even if errors occurred
    if client:
        print("Closing MongoDB connection.")
        client.close()


Attempting to connect to MongoDB...
MongoDB Configuration Error: The DNS query name does not exist: _mongodb._tcp.retailcluster.0mqnqed.mongodb.net.


In [4]:
df.head()


NameError: name 'df' is not defined

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

In [None]:
df.info()
df_processed = df.copy()    
df_processed.head()


In [None]:
# 2. Time-based Processing
# Convert datetime columns
df_processed['first_visit_date'] = pd.to_datetime(df_processed['first_visit_date'])
df_processed['purchase_date'] = pd.to_datetime(df_processed['purchase_date'])

# Calculate time difference and create target
df_processed['time_to_purchase'] = (df_processed['purchase_date'] - 
                                  df_processed['first_visit_date']).dt.total_seconds() / 3600

# Create 24-hour purchase target
df_processed['purchase_24h'] = np.where(df_processed['time_to_purchase'] <= 24, 1, 0)

# Extract time features
df_processed['hour'] = df_processed['first_visit_date'].dt.hour
df_processed['dayofweek'] = df_processed['first_visit_date'].dt.dayofweek
df_processed['is_weekend'] = df_processed['dayofweek'].isin([5,6]).astype(int)


In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 2000)


In [None]:
df_processed.head()

In [None]:
df_processed.info()

In [None]:
# 3. Screen List Processing
# Add comma for consistent processing
df_processed['screen_list'] = df_processed['screen_list'].astype(str) + ','

# Define screen categories
shopping_screens = ['ProductList', 'ProductDetail', 'CategoryBrowse', 'Search']
cart_screens = ['ShoppingCart', 'Checkout', 'PaymentMethods', 'DeliveryOptions']
engagement_screens = ['WishList', 'Reviews', 'Promotions']
account_screens = ['Account', 'AddressBook', 'OrderTracking']

# Create binary indicators for each screen
for screen in (shopping_screens + cart_screens + engagement_screens + account_screens):
    df_processed[screen.lower()] = df_processed['screen_list'].str.contains(screen).astype(int)

# Create count features for each category
df_processed['shopping_count'] = df_processed[[s.lower() for s in shopping_screens]].sum(axis=1)
df_processed['cart_count'] = df_processed[[s.lower() for s in cart_screens]].sum(axis=1)
df_processed['engagement_count'] = df_processed[[s.lower() for s in engagement_screens]].sum(axis=1)
df_processed['account_count'] = df_processed[[s.lower() for s in account_screens]].sum(axis=1)

# Create Other category
all_tracked_screens = shopping_screens + cart_screens + engagement_screens + account_screens
df_processed['other_screens'] = df_processed['screen_list'].apply(
    lambda x: len([s for s in x.split(',') if s and s not in all_tracked_screens])
)

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 200000)
df_processed

In [None]:
print(df_processed.columns)

In [None]:
df_processed[['user_id',	'first_visit_date',	'age','added_to_wishlist', 'made_purchase', 'purchase_date', 'shoppingcart', 'checkout', 'paymentmethods', 'deliveryoptions', 'wishlist', 'reviews', 'promotions', 'account']]

In [None]:
# 4. Feature Engineering
# Create engagement score
df_processed['engagement_score'] = (
    df_processed['session_count'] * 0.3 +
    df_processed['used_search_feature'] * 0.2 +
    df_processed['wrote_review'] * 0.15 +
    df_processed['added_to_wishlist'] * 0.15 +
    df_processed['total_screens_viewed'] * 0.2
)

# Create screen diversity score
df_processed['screen_diversity'] = (
    df_processed[['shopping_count', 'cart_count', 
                 'engagement_count', 'account_count']].gt(0).sum(axis=1)
)

# Create purchase intent score
df_processed['purchase_intent'] = (
    df_processed['cart_count'] * 0.4 +
    df_processed['shopping_count'] * 0.3 +
    df_processed['engagement_count'] * 0.2 +
    df_processed['added_to_wishlist'] * 0.1
)


In [None]:
df.info()

In [None]:
# 5. Categorical Feature Processing
# Platform encoding (keeping existing)
df_processed['platform'] = df_processed['platform'].map({'iOS': 1, 'Android': 0})

# Process new categorical columns
# Region encoding
region_dummies = pd.get_dummies(df_processed['region'], prefix='region')
df_processed = pd.concat([df_processed, region_dummies], axis=1)

# Acquisition channel encoding
channel_dummies = pd.get_dummies(df_processed['acquisition_channel'], prefix='channel')
df_processed = pd.concat([df_processed, channel_dummies], axis=1)

# User segment processing
# Extract age group and user type separately for more granular analysis
df_processed['age_group'] = df_processed['user_segment'].apply(lambda x: x.split()[0])
df_processed['user_type'] = df_processed['user_segment'].apply(lambda x: ' '.join(x.split()[1:]))

age_group_dummies = pd.get_dummies(df_processed['age_group'], prefix='age_group')
user_type_dummies = pd.get_dummies(df_processed['user_type'], prefix='user_type')
df_processed = pd.concat([df_processed, age_group_dummies, user_type_dummies], axis=1)

# App version processing
# Extract major version for simplified analysis
df_processed['app_major_version'] = df_processed['app_version'].apply(lambda x: int(x.split('.')[0]))

# Create version recency score (higher = newer version)
df_processed['version_score'] = df_processed['app_version'].apply(
    lambda x: sum(float(i)/(10**n) for n, i in enumerate(x.split('.')))
)

# 6. Clean up and prepare final dataset
# Drop original columns that have been processed
columns_to_drop = [
    'screen_list', 'purchase_date', 'first_visit_date', 
    'time_to_purchase', 'made_purchase', 'region', 
    'acquisition_channel', 'user_segment', 'app_version',
    'age_group', 'user_type'
]
df_processed = df_processed.drop(columns=columns_to_drop)

# Ensure all column names are lowercase
df_processed.columns = df_processed.columns.str.lower()

In [None]:
# 7. Quality Checks
print("Data Quality Report")
print("-" * 50)
print(f"Shape: {df_processed.shape}")
print(f"\nNull values:\n{df_processed.isnull().sum()[df_processed.isnull().sum() > 0]}")
print(f"\nPurchase rate (24h): {df_processed['purchase_24h'].mean():.2%}")


In [None]:
# 8. Feature Correlations
correlation_matrix = df_processed.corr()['purchase_24h'].sort_values(ascending=False)
print("\nTop 10 Features by Correlation with Purchase:")
print(correlation_matrix[:10])

In [None]:
# %%
# --- Configuration for writing back to MongoDB ---
DATABASE_NAME_WRITE = "detaildb"
COLLECTION_NAME_WRITE = "processed_retail_app_data"  # Use a new collection name for processed data

try:
    # --- Reuse existing MongoDB connection ---
    if client:
        # Select the database and collection for writing
        db_write = client[DATABASE_NAME_WRITE]
        collection_write = db_write[COLLECTION_NAME_WRITE]

        # --- Prepare Processed Data for MongoDB ---
        if not df_processed.empty:
            df_processed_filled = df_processed.fillna(value=float('nan')).replace({float('nan'): None})
            records_to_insert = df_processed_filled.to_dict("records")

            # --- Insert Processed Data into MongoDB ---
            if records_to_insert:
                result_insert = collection_write.insert_many(records_to_insert)
                print(f"Successfully inserted processed data into '{COLLECTION_NAME_WRITE}'.")
            else:
                print("No processed records to insert.")
        else:
            print("Processed DataFrame is empty. No data to write.")
    else:
        print("Error: MongoDB client connection was not established.")

except Exception as e:
    print(f"An unexpected error occurred during writing to MongoDB: {e}")

In [None]:
def save_to_mongodb(df):
    """
    Save processed dataframe to MongoDB.
    
    Parameters:
    -----------
    df : pandas.DataFrame
        Processed dataframe to save
    """
    print("\nSaving processed data to MongoDB...")
    
    # Load environment variables from .env file
    load_dotenv()
    
    # Get MongoDB connection details from environment variables
    username = os.getenv("MONGODB_USERNAME")
    password = os.getenv("MONGODB_PASSWORD")
    cluster = os.getenv("MONGODB_CLUSTER")
    database = os.getenv("MONGODB_DATABASE")
    
    # Create connection string
    connection_string = f"mongodb+srv://{username}:{password}@{cluster}/"
    
    try:
        # Create a client connection
        client = MongoClient(connection_string)
        
        # Connect to the database
        db = client.get_database(database)
        collection = db.processed_data  # Collection for processed data
        
        # Convert DataFrame to dictionary records
        records = df.to_dict("records")
        
        # Clear existing records and insert new ones
        collection.delete_many({})
        result = collection.insert_many(records)
        
        print(f"Successfully saved {len(result.inserted_ids)} processed records to MongoDB")
        print(f"Database: {database}, Collection: processed_data")
        
        # Display sample of the saved data
        print("\nSample of saved processed data (first 3 records):")
        for doc in collection.find().limit(3):
            print(f"User ID: {doc['user_id']}, Purchase 24h: {doc['purchase_24h']}, Engagement Score: {doc.get('engagement_score', 'N/A')}")
        
        # Close the connection
        client.close()
        
    except Exception as e:
        print(f"Error saving data to MongoDB: {e}")
