# Lipstick Recommendation Data Generation

This notebook processes the clustered lipstick data and generates a structured dataset.

## Process Overview:
1. Load and preprocess lipstick product data
2. Process URLs and clean data
3. Analyze color patterns and create clusters
4. Generate recommendation scores
5. Export frontend-ready dataset

In [1]:
import pandas as pd
import numpy as np
import pickle
import re
from typing import Dict
from sklearn.preprocessing import MinMaxScaler
from sklearn.cluster import KMeans
from datetime import datetime
from PIL import Image
from io import BytesIO
import base64

In [2]:
# Constants and Configuration
CLUSTER_NAMES = {
    0: 'Warm Brown',
    1: 'Soft Pink', 
    2: 'Nude',
    3: 'Classic Red',
    4: 'Deep Burgundy',
    5: 'Coral Pink'
}

RATING_WEIGHT = 0.7
REVIEW_WEIGHT = 0.3

DATA_PATH = "processed_data/metadata_with_assets_feedback.pkl"
MODEL_PATH = "trained_models/sephora_lipstick_clustering_model.pkl"
OUTPUT_PATH = "dataset/lipstick_recommendation_dataset.csv"

TOP_N = 50

In [3]:
class LipstickDataProcessor:
    """Process and analyze lipstick product data with color information."""
    
    def __init__(self, file_path: str):
        """Initialize with data file path."""
        self.df = pd.read_pickle(file_path)
        self.processed_df = None
        
    def process_urls(self) -> None:
        """Process and update product URLs."""
        def update_sku_url(row: pd.Series) -> str:
            base_url = row['targetUrl']
            updated_url = re.sub(r'skuId=\d+', f'skuId={row["skuID"]}', base_url)
            return updated_url
            
        self.df['processed_url'] = self.df.apply(update_sku_url, axis=1)
        self.df['full_url'] = 'https://www.sephora.com' + self.df['processed_url']
        self.df.drop('processed_url', axis=1, inplace=True)
        
    def clean_data(self) -> None:
        """Clean and prepare the data for analysis."""
        # Drop rows with missing skin tone
        self.df = self.df.dropna(subset=['ContextDataValues.skinTone.ValueLabel'])
        
        # Remove 'notSure' skin tone responses
        self.df = self.df[self.df['ContextDataValues.skinTone.ValueLabel'] != 'notSure']
        
        # Drop rows with invalid RGB values
        self.df = self.df[~self.df['avg_rgb'].apply(lambda x: np.array_equal(x, [0, 0, 0]))]
        self.df = self.df.dropna(subset=['avg_rgb'])
        
        # Standardize skin tone labels
        self.df['ContextDataValues.skinTone.ValueLabel'] = self.df['ContextDataValues.skinTone.ValueLabel'].replace({
            'Ebony': 'Rich',
            'Olive': 'Tan'
        })

In [4]:
class ColorAnalyzer:
    """Analyze color patterns and create color clusters with cross validation."""
    
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.n_clusters = 10
        self.optimal_k = 10
        self.kmeans_model = None
        self.cluster_labels = None
        self.cv_results = None

def load_analyzer(file_path: str = 'trained_models/sephora_lipstick_clustering_model.pkl') -> ColorAnalyzer:
    """Load a pre-trained color analyzer."""
    with open(file_path, 'rb') as f:
        return pickle.load(f)

In [5]:
def is_valid_base64_image(base64_str: str) -> bool:
    """Validate if string is a valid base64 encoded image.
    
    Args:
        base64_str: Base64 encoded string
        
    Returns:
        bool: True if valid image, False otherwise
    """
    try:
        if not isinstance(base64_str, str):
            return False
        img_data = base64.b64decode(base64_str)
        Image.open(BytesIO(img_data))
        return True
    except Exception:
        return False

In [6]:
class RecommendationDataGenerator:
    """Process and generate recommendation data for frontend consumption."""
    
    def __init__(self, data_processor: LipstickDataProcessor, model_path: str):
        """Initialize data generator with processor and model."""
        self.df = data_processor.df
        self.processor = data_processor
        self.analyzer = load_analyzer(model_path)
        self.frontend_df = None
            
    def process_color_data(self) -> None:
        """Process and normalize color data."""
        # Normalize RGB values
        self.df[['R', 'G', 'B']] = pd.DataFrame(
            self.df['avg_rgb'].tolist(),
            index=self.df.index
        ) / 255.0
        
        # Add cluster predictions
        X = self.df[['R', 'G', 'B']].values
        self.df['color_cluster'] = self.analyzer.kmeans_model.predict(X)
        self.df['cluster_name'] = self.df['color_cluster'].map(CLUSTER_NAMES)
        
    def calculate_recommendation_score(self, row: pd.Series) -> float:
        """Calculate product recommendation score."""
        try:
            rating = float(row['Rating'])
            rating_score = rating / 5.0

            reviews = int(row['reviews'])
            max_reviews = int(self.df['reviews'].max())

            reviews_float = np.float64(reviews)
            max_reviews_float = np.float64(max_reviews)
            review_score = np.log1p(reviews_float) / np.log1p(max_reviews_float)
            final_score = (RATING_WEIGHT * rating_score + REVIEW_WEIGHT * review_score)

            return round(final_score * 100, 2)

        except Exception as e:
            print(f"\nError calculating score for product:")
            print(f"SKU ID: {row['skuID']}")
            print(f"Rating: {row['Rating']} ({type(row['Rating'])})")
            print(f"Reviews: {row['reviews']} ({type(row['reviews'])})")
            print(f"Error: {str(e)}")
            return 0.0

    def filter_valid_images(self) -> None:
        """Filter out records with invalid image data."""
        # Check both cover and lipstick images
        valid_images = (
            pd.notna(self.df['cover_image_base64']) & 
            pd.notna(self.df['lipstick_image_base64']) &
            self.df['cover_image_base64'].apply(is_valid_base64_image) &
            self.df['lipstick_image_base64'].apply(is_valid_base64_image)
        )
        self.df = self.df[valid_images].copy()
        print(f"Filtered to {len(self.df)} records with valid images")

    def prepare_frontend_data(self, top_n: int = 50, min_products_per_cluster: int = 50) -> None:
        """Prepare and structure data for frontend use."""
        # Select relevant columns
        frontend_cols = [
            'skuID',
            'brandName',
            'displayName',
            'color_description',
            'color_cluster',
            'cluster_name',
            'avg_rgb',
            'Rating',
            'reviews',
            'currentSku.listPrice',
            'full_url',
            'cover_image_base64',
            'lipstick_image_base64'
        ]
        
        self.frontend_df = self.df[frontend_cols].copy()
  
        # Remove duplicates based on 'skuID'
        self.frontend_df = self.frontend_df.drop_duplicates(subset=['skuID'])
        
        # Add RGB string format
        self.frontend_df['rgb_value'] = self.frontend_df['avg_rgb'].apply(
            lambda x: f"rgb({int(x[0])},{int(x[1])},{int(x[2])})"
        )
        
        # Add recommendation scores
        self.frontend_df['recommendation_score'] = self.frontend_df.apply(
            self.calculate_recommendation_score, axis=1
        )
        
        # Get top N recommendations per cluster
        self.frontend_df = (self.frontend_df
            .groupby('color_cluster')
            .apply(lambda x: x.nlargest(top_n, 'recommendation_score'))
            .reset_index(drop=True)
        )
        
        # Ensure each cluster has at least min_products_per_cluster products
        self._ensure_minimum_products_per_cluster(min_products_per_cluster)

    def _ensure_minimum_products_per_cluster(self, min_products: int):
        """Ensure each cluster has at least `min_products` unique products."""
        adjusted_clusters = []

        all_skuIDs = set(self.frontend_df['skuID'])
        
        # Get the minimum recommendation_score per cluster
        min_scores = self.frontend_df.groupby('color_cluster')['recommendation_score'].min().to_dict()

        for cluster_id, group in self.frontend_df.groupby('color_cluster'):
            cluster_size = len(group)
            print(f"Cluster {cluster_id} has {cluster_size} products.")
            
            if cluster_size >= min_products:
                # If the cluster has enough products, keep it as is
                adjusted_clusters.append(group)
            else:
                num_to_add = min_products - cluster_size
                print(f"Cluster {cluster_id} needs {num_to_add} more products.")
                
                # Get products from other clusters, excluding those already in any cluster
                other_clusters = self.frontend_df[self.frontend_df['color_cluster'] != cluster_id]
                
                # Exclude products already in the cluster
                existing_skuIDs = set(group['skuID'])
                available_products = other_clusters[~other_clusters['skuID'].isin(existing_skuIDs)]
                
                # Sort available products by recommendation_score descending
                available_products = available_products.sort_values(
                    'recommendation_score', ascending=False
                )
                
                # Select top N products to add
                products_to_add = available_products.head(num_to_add).copy()
                
                # Adjust the recommendation_score to be less than the minimum in the cluster
                if cluster_id in min_scores:
                    min_score = min_scores[cluster_id]
                    # Subtract a small value to ensure it's less
                    products_to_add['recommendation_score'] = min_score - 0.01
                else:
                    # If no existing score, set to a default low value
                    products_to_add['recommendation_score'] = 0.0
                
                # Update the cluster_id and cluster_name to the current cluster
                products_to_add['color_cluster'] = cluster_id
                products_to_add['cluster_name'] = CLUSTER_NAMES.get(cluster_id, f"Cluster {cluster_id}")
                
                # Add the products to the cluster
                adjusted_group = pd.concat([group, products_to_add], ignore_index=True)
                adjusted_clusters.append(adjusted_group)
        
        # Combine all clusters back into frontend_df
        self.frontend_df = pd.concat(adjusted_clusters, ignore_index=True)
        
    def export_data(self, output_path: str) -> None:
        """Export processed data to CSV."""
        # Ensure all required columns are present
        required_cols = [
            'skuID',
            'brandName',
            'displayName',
            'color_description',
            'color_cluster',
            'cluster_name',
            'rgb_value',
            'Rating',
            'reviews',
            'currentSku.listPrice',
            'full_url',
            'cover_image_base64',
            'lipstick_image_base64',
            'recommendation_score'
        ]
        
        export_df = self.frontend_df[required_cols]
        
        # Add metadata
        export_df.attrs['generated_date'] = datetime.now().isoformat()
        export_df.attrs['total_products'] = len(export_df)
        export_df.attrs['cluster_counts'] = export_df['cluster_name'].value_counts().to_dict()
        
        # Export to CSV
        export_df.to_csv(output_path, index=False)
        
        # Print summary
        print("Data Export Summary:")
        print(f"Total products processed: {len(export_df):,}")
        print("\nCluster Distribution:")
        for name, count in export_df['cluster_name'].value_counts().items():
            print(f"{name}: {count:,} products ({count/len(export_df)*100:.1f}%)")

In [None]:

# Initialize Processor and Generator
processor = LipstickDataProcessor(DATA_PATH)
processor.process_urls()
processor.clean_data()

# Process pipeline
generator = RecommendationDataGenerator(processor, MODEL_PATH)

print("Processing data...")
generator.process_color_data()

print("Filtering valid images...")
generator.filter_valid_images()

print("Preparing frontend data...")
generator.prepare_frontend_data(top_n=TOP_N)

print("Exporting data...")
generator.export_data(OUTPUT_PATH)