In [None]:
path = "/home/tengo/Documents/code/phoner/notebooks/phone-price.parquet"

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, r2_score
import xgboost as xgb
import joblib


class PhonePricePredictor:
    def __init__(self, random_state=42):
        self.random_state = random_state
        self.models = {}

        # Define price segments
        self.price_segments = {
            "budget": (0, 300),
            "mid_range": (300, 700),
            "premium": (700, float("inf")),
        }

    def load_and_preprocess(self, file_path=path):
        """Load and preprocess the dataset"""
        # Load the parquet file
        df = pd.read_parquet(file_path)

        # Clean column names
        df.columns = df.columns.str.strip()

        # Drop duplicates if product_id exists
        if "product_id" in df.columns:
            df = df.drop_duplicates(subset=["product_id"], keep="last")

        # Basic preprocessing
        if "brand" not in df.columns and "product_name" in df.columns:
            df["brand"] = df["product_name"].str.split().str[0]

        if "NFC" in df.columns:
            df["NFC"] = df["NFC"].fillna(-1)

        if "CPU" in df.columns:
            df["CPU_manufacturer"] = df["CPU"].apply(
                lambda x: x.split()[0] if isinstance(x, str) else "Unknown"
            )

        # Convert RAM and ROM to GB if in MB
        if "RAM_MB" in df.columns:
            df["RAM_GB"] = df["RAM_MB"] / 1024
        if "ROM_MB" in df.columns:
            df["ROM_GB"] = df["ROM_MB"] / 1024

        # Feature engineering to better differentiate premium phones

        # Brand tier categorization (if known premium brands exist in data)
        premium_brands = ["Apple", "Samsung", "Google", "Huawei", "Sony"]
        mid_tier_brands = ["Xiaomi", "OnePlus", "Oppo", "Vivo", "Motorola", "LG"]

        if "brand" in df.columns:
            df["brand_tier"] = df["brand"].apply(
                lambda x: (
                    "premium"
                    if x in premium_brands
                    else ("mid_tier" if x in mid_tier_brands else "budget")
                )
            )

        # RAM to ROM ratio (premium phones often have different ratios)
        if "RAM_GB" in df.columns and "ROM_GB" in df.columns:
            df["RAM_ROM_ratio"] = df["RAM_GB"] / df["ROM_GB"]

        # Add price category for segmentation
        df["price_category"] = df["Prices"].apply(self._get_price_category)

        # Define features - use only columns that exist in the dataset
        self.features = []

        # Core features - check each one exists before adding
        for feature in [
            "RAM_GB",
            "ROM_GB",
            "NFC",
            "camera_mp_float",
            "CPU_manufacturer",
            "brand",
            "OS",
        ]:
            if feature in df.columns:
                self.features.append(feature)

        # Engineered features - check each one exists before adding
        for feature in ["brand_tier", "RAM_ROM_ratio"]:
            if feature in df.columns:
                self.features.append(feature)

        # Store the final list of features for prediction
        self.final_features = self.features.copy()

        print(f"Using features: {self.features}")

        self.target = "Prices"

        # Drop rows with missing target
        df = df.dropna(subset=[self.target])

        # Store default values for each feature for prediction
        self.feature_defaults = {}
        for feature in self.features:
            if feature in df.columns:
                if df[feature].dtype == "object":
                    self.feature_defaults[feature] = "Unknown"
                else:
                    self.feature_defaults[feature] = df[feature].median()

        self.data = df
        return df

    def _get_price_category(self, price):
        """Determine the price category for a given price"""
        if price < self.price_segments["budget"][1]:
            return "budget"
        elif price < self.price_segments["mid_range"][1]:
            return "mid_range"
        else:
            return "premium"

    def segment_and_train(self):
        """Segment the data and train models for each segment"""
        # Segment the data
        for category in self.price_segments.keys():
            segment_df = self.data[self.data["price_category"] == category]

            if len(segment_df) > 10:  # Only train if we have enough data
                X = segment_df[self.features]
                y = segment_df[self.target]

                # For premium segment, consider log-transforming the target
                if category == "premium":
                    log_transform = True
                    if log_transform:
                        y = np.log1p(y)  # log(1+y) to handle potential zeros
                else:
                    log_transform = False

                # Split the data
                X_train, X_test, y_train, y_test = train_test_split(
                    X, y, test_size=0.2, random_state=self.random_state
                )

                # Create preprocessor
                categorical_features = X.select_dtypes(
                    include=["object"]
                ).columns.tolist()
                numerical_features = X.select_dtypes(
                    include=["int64", "float64"]
                ).columns.tolist()

                # Improved preprocessing with scaling for numerical features
                preprocessor = ColumnTransformer(
                    transformers=[
                        (
                            "num",
                            Pipeline(
                                [
                                    ("imputer", SimpleImputer(strategy="median")),
                                    (
                                        "scaler",
                                        StandardScaler(),
                                    ),  # Add scaling for better performance
                                ]
                            ),
                            numerical_features,
                        ),
                        (
                            "cat",
                            Pipeline(
                                [
                                    (
                                        "imputer",
                                        SimpleImputer(
                                            strategy="constant", fill_value="Unknown"
                                        ),
                                    ),
                                    ("onehot", OneHotEncoder(handle_unknown="ignore")),
                                ]
                            ),
                            categorical_features,
                        ),
                    ]
                )

                # Create pipeline with XGBoost
                pipeline = Pipeline(
                    [
                        ("preprocessor", preprocessor),
                        ("regressor", xgb.XGBRegressor(random_state=self.random_state)),
                    ]
                )

                # Enhanced parameter grid based on segment
                if category == "premium":
                    # More extensive tuning for premium segment
                    param_grid = {
                        "regressor__n_estimators": [100, 200, 300],
                        "regressor__max_depth": [4, 6, 8],
                        "regressor__learning_rate": [0.01, 0.05, 0.1, 0.2],
                        "regressor__min_child_weight": [1, 3, 5],
                        "regressor__gamma": [0, 0.1, 0.2],
                        "regressor__subsample": [0.8, 0.9, 1.0],
                    }
                elif category == "mid_range":
                    # The mid-range model performed worst, so give it more tuning options
                    param_grid = {
                        "regressor__n_estimators": [50, 100, 200],
                        "regressor__max_depth": [3, 6, 9],
                        "regressor__learning_rate": [0.05, 0.1, 0.15],
                        "regressor__min_child_weight": [1, 3],
                        "regressor__subsample": [0.8, 1.0],
                    }
                else:
                    # Budget model already performs well
                    param_grid = {
                        "regressor__n_estimators": [50, 100],
                        "regressor__max_depth": [4, 6],
                        "regressor__learning_rate": [0.05, 0.1],
                    }

                # Use GridSearchCV for hyperparameter tuning
                grid_search = GridSearchCV(
                    pipeline,
                    param_grid,
                    cv=min(5, len(X_train) // 100 + 2),  # Dynamic CV based on data size
                    scoring="neg_mean_absolute_error",
                    n_jobs=-1,
                )

                # Train the model
                print(f"\nTraining {category} model with {len(X_train)} samples...")
                grid_search.fit(X_train, y_train)

                # Evaluate on test set
                best_model = grid_search.best_estimator_
                y_pred = best_model.predict(X_test)

                # Inverse transform for premium if log-transformed
                if log_transform:
                    y_test_original = np.expm1(y_test)  # expm1 is inverse of log1p
                    y_pred_original = np.expm1(y_pred)
                    mae = mean_absolute_error(y_test_original, y_pred_original)
                    r2 = r2_score(y_test_original, y_pred_original)
                else:
                    mae = mean_absolute_error(y_test, y_pred)
                    r2 = r2_score(y_test, y_pred)

                # Calculate and print metrics
                print(f"\n{category.title()} Model:")
                print(f"- Number of phones: {len(segment_df)}")
                print(f"- Best parameters: {grid_search.best_params_}")
                print(f"- Mean Absolute Error: ${mae:.2f}")
                print(f"- R² Score: {r2:.4f}")

                # Store the model and whether it uses log transform
                self.models[category] = {
                    "model": best_model,
                    "log_transform": log_transform,
                    "features": self.features,  # Store features used for this model
                }
            else:
                print(
                    f"\nNot enough data for {category} segment ({len(segment_df)} phones)"
                )

        return self.models

    def save_models(self, prefix="phone_price_model"):
        """Save trained models to disk"""
        for segment_name, model_info in self.models.items():
            model_path = f"{prefix}_{segment_name}.pkl"
            joblib.dump(model_info, model_path)
            print(f"Saved {segment_name} model to {model_path}")

    def predict_price(self, phone_specs):
        # Get predictions from all models
        predictions = {}
        for segment_name, model_info in self.models.items():
            pred = model_info["model"].predict(pd.DataFrame([phone_specs]))[0]
            if model_info["log_transform"]:
                pred = np.expm1(pred)
            predictions[segment_name] = pred

        # Identify which prediction is most realistic based on phone specs
        if phone_specs["brand"] in ["Apple", "Samsung"] and phone_specs["RAM_GB"] >= 8:
            return predictions["premium"]
        elif phone_specs["RAM_GB"] <= 4 and phone_specs["ROM_GB"] <= 64:
            return predictions["budget"]
        else:
            return predictions["mid_range"]

    def run_pipeline(self, file_path=path):
        """Run the complete pipeline"""
        self.load_and_preprocess(file_path)
        self.segment_and_train()
        self.save_models()
        return self.models


# Example usage for real phones in Azerbaijan
def main():
    # Initialize the predictor
    predictor = PhonePricePredictor()

    # Run the pipeline with your dataset path
    predictor.run_pipeline(path)  # Uses the global path variable

    # Test with real phones from Azerbaijan
    real_phones = [
        {
            "name": "iPhone 15",
            "price_azn": 1453,
            "price_usd": 855,
            "specs": {
                "RAM_GB": 6,
                "ROM_GB": 128,
                "NFC": 1,
                "camera_mp_float": 48,
                "CPU_manufacturer": "Apple",
                "brand": "Apple",
                "OS": "iOS",
                "brand_tier": "premium",
                "RAM_ROM_ratio": 6 / 128,
            },
        },
        {
            "name": "Samsung Galaxy S24 Ultra",
            "price_azn": 1716,
            "price_usd": 1009,
            "specs": {
                "RAM_GB": 12,
                "ROM_GB": 256,
                "NFC": 1,
                "camera_mp_float": 200,
                "CPU_manufacturer": "Qualcomm",
                "brand": "Samsung",
                "OS": "Android",
                "brand_tier": "premium",
                "RAM_ROM_ratio": 12 / 256,
            },
        },
        {
            "name": "Xiaomi 14 Pro",
            "price_azn": 1199,
            "price_usd": 705,
            "specs": {
                "RAM_GB": 12,
                "ROM_GB": 256,
                "NFC": 1,
                "camera_mp_float": 50,
                "CPU_manufacturer": "Qualcomm",
                "brand": "Xiaomi",
                "OS": "Android",
                "brand_tier": "mid_tier",
                "RAM_ROM_ratio": 12 / 256,
            },
        },
        {
            "name": "Xiaomi Redmi 10",
            "price_azn": 133,
            "price_usd": 78,
            "specs": {
                "RAM_GB": 4,
                "ROM_GB": 64,
                "NFC": 0,
                "camera_mp_float": 50,
                "CPU_manufacturer": "MediaTek",
                "brand": "Xiaomi",
                "OS": "Android",
                "brand_tier": "budget",
                "RAM_ROM_ratio": 4 / 64,
            },
        },
        {
            "name": "iPhone 13 Pro",
            "price_azn": 767,
            "price_usd": 451,
            "specs": {
                "RAM_GB": 6,
                "ROM_GB": 128,
                "NFC": 1,
                "camera_mp_float": 12,
                "CPU_manufacturer": "Apple",
                "brand": "Apple",
                "OS": "iOS",
                "brand_tier": "premium",
                "RAM_ROM_ratio": 6 / 128,
            },
        },
    ]

    # Test the prediction against real prices
    print("\n=== Testing with Real Phones from Azerbaijan ===")
    print(
        f"{'Phone Model':<25} {'Actual Price ($)':<15} {'Predicted ($)':<15} {'Error ($)':<10} {'Error %':<10}"
    )
    print("-" * 75)

    for phone in real_phones:
        name = phone["name"]
        actual_price = phone["price_usd"]
        predicted_price = predictor.predict_price(phone["specs"])

        if predicted_price:
            error = predicted_price - actual_price
            error_percent = (error / actual_price) * 100

            print(
                f"{name:<25} ${actual_price:<14.2f} ${predicted_price:<14.2f} ${error:<9.2f} {error_percent:<9.1f}%"
            )
        else:
            print(f"{name:<25} ${actual_price:<14.2f} Prediction failed")

    print(
        "\nNote: This comparison helps assess how well the model generalizes to real market prices in Azerbaijan."
    )


main()