1. Get data
2. EDA
3. Prepare Data (feature engineering )
4. Select ,Train ,Finetune Model

In [None]:
import os
import pandas as pd
import pymongo
from pymongo import MongoClient
from typing import Dict, List, Any, Optional
import json
from datetime import datetime

# Configuration
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
DB_NAME = "digikala"
PRODUCTS_COLLECTION = "products"

class DigikalaDataReader:
    """
    A class to read product data from Digikala MongoDB database and convert to pandas DataFrame.
    """
    
    def __init__(self, mongo_uri: str = MONGO_URI, db_name: str = DB_NAME):
        """
        Initialize the MongoDB connection.
        
        Args:
            mongo_uri: MongoDB connection string
            db_name: Database name
        """
        self.client = MongoClient(mongo_uri)
        self.db = self.client[db_name]
        self.products_collection = self.db[PRODUCTS_COLLECTION]
        
    def get_collection_info(self) -> Dict[str, Any]:
        """
        Get basic information about the products collection.
        
        Returns:
            Dictionary containing collection statistics
        """
        try:
            total_docs = self.products_collection.count_documents({})
            sample_doc = self.products_collection.find_one()
            
            return {
                "total_documents": total_docs,
                "sample_document": sample_doc,
                "collection_name": PRODUCTS_COLLECTION,
                "database_name": DB_NAME
            }
        except Exception as e:
            print(f"Error getting collection info: {e}")
            return {}
    
    def flatten_specifications(self, specifications: Dict) -> Dict[str, str]:
        """
        Flatten the nested specifications object into a flat dictionary.
        
        Args:
            specifications: Nested specifications dictionary
            
        Returns:
            Flattened dictionary with specification keys and values
        """
        flattened = {}
        if not specifications:
            return flattened
            
        for group_name, group_data in specifications.items():
            if isinstance(group_data, dict):
                for attr_name, attr_values in group_data.items():
                    if isinstance(attr_values, list):
                        # Join list values with comma
                        flattened[f"{group_name}_{attr_name}"] = ", ".join(str(v) for v in attr_values)
                    else:
                        flattened[f"{group_name}_{attr_name}"] = str(attr_values)
            else:
                flattened[group_name] = str(group_data)
        
        return flattened
    
    def process_colors(self, colors: List[str]) -> str:
        """
        Process colors list into a comma-separated string.
        
        Args:
            colors: List of color strings
            
        Returns:
            Comma-separated string of colors
        """
        if not colors:
            return ""
        return ", ".join(colors)
    
    def process_images(self, images: List[str]) -> str:
        """
        Process images list into a comma-separated string.
        
        Args:
            images: List of image URLs
            
        Returns:
            Comma-separated string of image URLs
        """
        if not images:
            return ""
        return ", ".join(images)
    
    def process_suggestions(self, suggestions: Dict) -> str:
        """
        Process suggestions dictionary into a string representation.
        
        Args:
            suggestions: Suggestions dictionary
            
        Returns:
            String representation of suggestions
        """
        if not suggestions:
            return ""
        return json.dumps(suggestions, ensure_ascii=False)
    
    def process_comments_overview(self, comments_overview: Dict) -> Dict[str, str]:
        """
        Process comments overview into separate fields.
        
        Args:
            comments_overview: Comments overview dictionary
            
        Returns:
            Dictionary with separate overview fields
        """
        if not comments_overview:
            return {"overview": "", "advantages": "", "disadvantages": ""}
        
        return {
            "overview": comments_overview.get("overview", ""),
            "advantages": comments_overview.get("advantages", ""),
            "disadvantages": comments_overview.get("disadvantages", "")
        }
    
    def read_products_to_dataframe(self, 
                                 limit: Optional[int] = None,
                                 filter_query: Optional[Dict] = None,
                                 include_specifications: bool = True) -> pd.DataFrame:
        """
        Read product data from MongoDB and convert to pandas DataFrame.
        
        Args:
            limit: Maximum number of documents to retrieve (None for all)
            filter_query: MongoDB filter query to apply
            include_specifications: Whether to flatten and include specifications
            
        Returns:
            pandas DataFrame containing product data
        """
        try:
            # Build query
            query = filter_query or {}
            
            # Get cursor
            cursor = self.products_collection.find(query)
            if limit:
                cursor = cursor.limit(limit)
            
            # Convert to list
            documents = list(cursor)
            
            if not documents:
                print("No documents found matching the criteria.")
                return pd.DataFrame()
            
            print(f"Retrieved {len(documents)} documents from MongoDB.")
            
            # Process documents
            processed_docs = []
            
            for doc in documents:
                # Basic fields
                processed_doc = {
                    "_id": doc.get("_id"),
                    "title_en": doc.get("title_en"),
                    "title_fa": doc.get("title_fa"),
                    "brand": doc.get("brand"),
                    "category": doc.get("category"),
                    "price": doc.get("price"),
                    "rate": doc.get("rate"),
                    "count_raters": doc.get("count_raters"),
                    "popularity": doc.get("popularity"),
                    "num_questions": doc.get("num_questions"),
                    "num_comments": doc.get("num_comments"),
                }
                
                # Process complex fields
                processed_doc["colors"] = self.process_colors(doc.get("colors", []))
                processed_doc["images"] = self.process_images(doc.get("images", []))
                processed_doc["suggestions"] = self.process_suggestions(doc.get("suggestions", {}))
                
                # Process comments overview
                comments_overview = self.process_comments_overview(doc.get("comments_overview", {}))
                processed_doc.update(comments_overview)
                
                # Process specifications if requested
                if include_specifications:
                    flattened_specs = self.flatten_specifications(doc.get("specifications", {}))
                    processed_doc.update(flattened_specs)
                
                processed_docs.append(processed_doc)
            
            # Create DataFrame
            df = pd.DataFrame(processed_docs)
            
            # Convert numeric columns
            numeric_columns = ["price", "rate", "count_raters", "popularity", "num_questions", "num_comments"]
            for col in numeric_columns:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col], errors='coerce')
            
            print(f"Created DataFrame with shape: {df.shape}")
            print(f"Columns: {list(df.columns)}")
            
            return df
            
        except Exception as e:
            print(f"Error reading data from MongoDB: {e}")
            return pd.DataFrame()
    
    def get_data_summary(self, df: pd.DataFrame) -> Dict[str, Any]:
        """
        Get summary statistics of the DataFrame.
        
        Args:
            df: pandas DataFrame
            
        Returns:
            Dictionary containing summary statistics
        """
        if df.empty:
            return {"error": "DataFrame is empty"}
        
        summary = {
            "shape": df.shape,
            "columns": list(df.columns),
            "missing_values": df.isnull().sum().to_dict(),
            "data_types": df.dtypes.to_dict(),
            "numeric_summary": df.describe().to_dict() if not df.select_dtypes(include=['number']).empty else {},
            "categorical_summary": {}
        }
        
        # Categorical columns summary
        categorical_cols = df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if col in df.columns:
                summary["categorical_summary"][col] = {
                    "unique_values": df[col].nunique(),
                    "most_common": df[col].value_counts().head().to_dict()
                }
        
        return summary
    
    def close_connection(self):
        """Close MongoDB connection."""
        if self.client:
            self.client.close()
            print("MongoDB connection closed.")


def main():
    """
    Example usage of the DigikalaDataReader class.
    """
    # Initialize reader
    reader = DigikalaDataReader()
    
    try:
        # Get collection info
        print("=== Collection Information ===")
        info = reader.get_collection_info()
        print(f"Total documents: {info.get('total_documents', 'Unknown')}")
        print(f"Database: {info.get('database_name', 'Unknown')}")
        print(f"Collection: {info.get('collection_name', 'Unknown')}")
        print()
        
        # Read all products (you can add limit for testing)
        print("=== Reading Product Data ===")
        df = reader.read_products_to_dataframe(limit=1000)  # Limit for testing
        
        if not df.empty:
            # Display basic info
            print(f"DataFrame shape: {df.shape}")
            print(f"Columns: {list(df.columns)}")
            print()
            
            # Display first few rows
            print("=== First 5 Rows ===")
            print(df.head())
            print()
            
            # Get summary
            print("=== Data Summary ===")
            summary = reader.get_data_summary(df)
            print(f"Missing values per column:")
            for col, missing in summary["missing_values"].items():
                if missing > 0:
                    print(f"  {col}: {missing}")
            
            # Save to CSV (optional)
            output_file = f"digikala_products_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
            df.to_csv(output_file, index=False, encoding='utf-8')
            print(f"Data saved to: {output_file}")
            
        else:
            print("No data retrieved.")
            
    except Exception as e:
        print(f"Error: {e}")
    
    finally:
        # Close connection
        reader.close_connection()


if __name__ == "__main__":
    main()