In [3]:
import pandas as pd
import numpy as np
import heapq
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.utils import resample
from scipy.sparse import issparse
import joblib

# ====================== CUSTOM MODELS ======================

class DecisionTree:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree_ = None

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, (pd.Series, pd.DataFrame)):
            y = y.values
            
        self.tree_ = self._build_tree(X, y, depth=0)
    
    def _build_tree(self, X, y, depth):
        num_samples = X.shape[0]
        
        # Stopping conditions
        if (self.max_depth is not None and depth >= self.max_depth) or \
           num_samples < self.min_samples_split or \
           len(np.unique(y)) == 1:
            return np.mean(y)

        best_split = self._find_best_split(X, y)
        if best_split is None:
            return np.mean(y)
        
        left_indices = X[:, best_split['feature']] <= best_split['value']
        right_indices = ~left_indices
        
        if np.sum(left_indices) == 0 or np.sum(right_indices) == 0:
            return np.mean(y)
        
        left_tree = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_tree = self._build_tree(X[right_indices], y[right_indices], depth + 1)
        
        return {
            'feature': best_split['feature'],
            'value': best_split['value'],
            'left': left_tree,
            'right': right_tree
        }
    
    def _find_best_split(self, X, y):
        best_split = None
        best_mse = float('inf')
        num_features = X.shape[1]

        for feature in range(num_features):
            unique_values = np.unique(X[:, feature])
            split_points = np.percentile(unique_values, [25, 50, 75]) if len(unique_values) > 10 else unique_values
            
            for value in split_points:
                left_indices = X[:, feature] <= value
                right_indices = ~left_indices
                
                if np.sum(left_indices) < 2 or np.sum(right_indices) < 2:
                    continue
                
                left_y = y[left_indices]
                right_y = y[right_indices]
                
                mse = (np.var(left_y) * len(left_y) + np.var(right_y) * len(right_y)) / len(y)
                
                if mse < best_mse:
                    best_split = {'feature': feature, 'value': value}
                    best_mse = mse
        
        return best_split

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
            
        return np.array([self._predict(sample, self.tree_) for sample in X])
    
    def _predict(self, sample, tree):
        if not isinstance(tree, dict):
            return tree
        
        if sample[tree['feature']] <= tree['value']:
            return self._predict(sample, tree['left'])
        return self._predict(sample, tree['right'])


class RandomForest:
    def __init__(self, n_estimators=100, max_depth=None, max_features='sqrt'):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.trees = []
        self.feature_indices = []

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, (pd.Series, pd.DataFrame)):
            y = y.values
            
        n_features = X.shape[1]
        max_feats = int(np.sqrt(n_features)) if self.max_features == 'sqrt' else self.max_features
        
        for _ in range(self.n_estimators):
            X_sample, y_sample = resample(X, y)
            feature_idx = np.random.choice(n_features, max_feats, replace=False)
            X_sub = X_sample[:, feature_idx]
            
            tree = DecisionTree(max_depth=self.max_depth)
            tree.fit(X_sub, y_sample)
            
            self.trees.append(tree)
            self.feature_indices.append(feature_idx)
    
    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
            
        all_preds = np.zeros((self.n_estimators, X.shape[0]))
        
        for i, (tree, feat_idx) in enumerate(zip(self.trees, self.feature_indices)):
            X_sub = X[:, feat_idx]
            all_preds[i] = tree.predict(X_sub)
            
        return np.mean(all_preds, axis=0)


class CustomKNN:
    def __init__(self, k=5, metric='cosine'):
        self.k = k
        self.metric = metric
        self.X_train = None
        self.y_train = None
        
    def _cosine_similarity(self, a, b):
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _euclidean_distance(self, a, b):
        return np.sqrt(np.sum((a - b) ** 2))
    
    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
        self.X_train = X
        # Convert y to numpy array if it's a pandas Series
        self.y_train = y.values if isinstance(y, pd.Series) else y
    
    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
            
        predictions = []
        for sample in X:
            distances = []
            if self.metric == 'cosine':
                distances = [self._cosine_similarity(sample, x) for x in self.X_train]
                # Get indices of k largest cosine similarities
                neighbors = np.argpartition(distances, -self.k)[-self.k:]
            else:
                distances = [self._euclidean_distance(sample, x) for x in self.X_train]
                # Get indices of k smallest distances
                neighbors = np.argpartition(distances, self.k)[:self.k]
            
            # Get the average of the neighbors' values
            prediction = np.mean(self.y_train[neighbors])
            predictions.append(prediction)
            
        return np.array(predictions)


# ====================== DATA PIPELINE ======================

print("Loading and preprocessing data...")
df = pd.read_csv('laptop_data.csv')
df.drop(columns=["Unnamed: 0"], inplace=True)

# Data cleaning
df["Ram"] = df["Ram"].str.replace("GB", "").astype("int")
df["Weight"] = df["Weight"].str.replace("kg", "").astype("float")

# Feature engineering
df["Touchscreen"] = df["ScreenResolution"].apply(lambda x: 1 if "Touchscreen" in x else 0)
df["Ips"] = df["ScreenResolution"].apply(lambda x: 1 if "IPS" in x else 0)

# Process resolution
temp = df["ScreenResolution"].str.split("x", n=1, expand=True)
df["X_res"] = temp[0].str.replace(',', '').str.findall(r'(\d+\.?\d+)').apply(lambda x: x[0]).astype(int)
df["Y_res"] = temp[1].astype(int)
df['ppi'] = (((df['X_res']**2) + (df['Y_res']**2))**0.5/df['Inches']).astype('float')
df.drop(columns=["ScreenResolution", "X_res", "Y_res", "Inches"], inplace=True)

# Process CPU
df['Cpu Name'] = df['Cpu'].apply(lambda x: " ".join(x.split()[0:3]))
def fetch_processor(text):
    if text in ['Intel Core i7', 'Intel Core i5', 'Intel Core i3']:
        return text
    elif text.split()[0] == 'Intel':
        return 'Other Intel Processor'
    else:
        return 'AMD Processor'
df['Cpu brand'] = df['Cpu Name'].apply(fetch_processor)
df.drop(columns=['Cpu', 'Cpu Name'], inplace=True)

# Process Memory
df['Memory'] = df['Memory'].astype(str).replace(r'\.0', '', regex=True)
df["Memory"] = df["Memory"].str.replace('GB', '').str.replace('TB', '000')
new = df["Memory"].str.split("+", n=1, expand=True)
df["first"] = new[0].str.strip().str.replace(r'\D', '', regex=True).astype(int)
df["second"] = new[1].fillna("0").str.replace(r'\D', '', regex=True).astype(int)
df["HDD"] = (df["first"] * df["first"].apply(lambda x: 1 if "HDD" in str(x) else 0)) + \
            (df["second"] * df["second"].apply(lambda x: 1 if "HDD" in str(x) else 0))
df["SSD"] = (df["first"] * df["first"].apply(lambda x: 1 if "SSD" in str(x) else 0)) + \
            (df["second"] * df["second"].apply(lambda x: 1 if "SSD" in str(x) else 0))
df.drop(columns=['first', 'second', 'Memory'], inplace=True)

# Process GPU
df['Gpu brand'] = df['Gpu'].apply(lambda x: x.split()[0])
df = df[df['Gpu brand'] != 'ARM']
df.drop(columns=['Gpu'], inplace=True)

# Process OS
def cat_os(inp):
    if inp in ['Windows 10', 'Windows 7', 'Windows 10 S']:
        return 'Windows'
    elif inp in ['macOS', 'Mac OS X']:
        return 'Mac'
    else:
        return 'Others/No OS/Linux'
df['os'] = df['OpSys'].apply(cat_os)
df.drop(columns=['OpSys'], inplace=True)

# Features and target
X = df.drop(columns=['Price'])
y = np.log(df['Price'])

# Preprocessing pipeline
cat_cols = ['Company', 'TypeName', 'Cpu brand', 'Gpu brand', 'os']
num_cols = ['Ram', 'Weight', 'Touchscreen', 'Ips', 'ppi', 'HDD', 'SSD']

preprocessor = ColumnTransformer([
    ('num', StandardScaler(), num_cols),
    ('cat', OneHotEncoder(handle_unknown='ignore'), cat_cols)
])

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Transform
X_train_transformed = preprocessor.fit_transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

if issparse(X_train_transformed):
    X_train_transformed = X_train_transformed.toarray()
if issparse(X_test_transformed):
    X_test_transformed = X_test_transformed.toarray()

# ====================== MODEL TRAINING ======================

print("\nTraining custom Random Forest...")
rf_model = RandomForest(n_estimators=100, max_depth=10, max_features='sqrt')
rf_model.fit(X_train_transformed, y_train)

# Evaluate Random Forest
y_pred_rf = rf_model.predict(X_test_transformed)
mse_rf = mean_squared_error(y_test, y_pred_rf)
mae_rf = mean_absolute_error(y_test, y_pred_rf)
r2_rf = r2_score(y_test, y_pred_rf)

print("\nRandom Forest Performance:")
print(f"MSE: {mse_rf:.4f}")
print(f"MAE: {mae_rf:.4f}")
print(f"R² Score: {r2_rf:.4f}")

print("\nTraining custom KNN...")
knn_model = CustomKNN(k=5, metric='cosine')
knn_model.fit(X_train_transformed, y_train)

# Evaluate KNN
y_pred_knn = knn_model.predict(X_test_transformed)
mse_knn = mean_squared_error(y_test, y_pred_knn)
mae_knn = mean_absolute_error(y_test, y_pred_knn)
r2_knn = r2_score(y_test, y_pred_knn)

print("\nKNN Performance:")
print(f"MSE: {mse_knn:.4f}")
print(f"MAE: {mae_knn:.4f}")
print(f"R² Score: {r2_knn:.4f}")

# ====================== SAVE MODELS ======================

print("\nSaving models...")
joblib.dump({
    'df': df,
    'preprocessor': preprocessor,
    'random_forest': rf_model,
    'knn': knn_model
}, 'laptop_models_full_custom.pkl')

print("Saved successfully to laptop_models_full_custom.pkl ✅")

Loading and preprocessing data...

Training custom Random Forest...

Random Forest Performance:
MSE: 0.1576
MAE: 0.3223
R² Score: 0.5910

Training custom KNN...

KNN Performance:
MSE: 0.0704
MAE: 0.1964
R² Score: 0.8174

Saving models...
Saved successfully to laptop_models_full_custom.pkl ✅


In [4]:
import pandas as pd
import numpy as np
import heapq
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.utils import resample
from scipy.sparse import issparse
import joblib

# ====================== CUSTOM MODELS ======================

class DecisionTree:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree_ = None

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, (pd.Series, pd.DataFrame)):
            y = y.values
            
        self.tree_ = self._build_tree(X, y, depth=0)
    
    def _build_tree(self, X, y, depth):
        num_samples = X.shape[0]
        
        # Stopping conditions
        if (self.max_depth is not None and depth >= self.max_depth) or \
           num_samples < self.min_samples_split or \
           len(np.unique(y)) == 1:
            return np.mean(y)

        best_split = self._find_best_split(X, y)
        if best_split is None:
            return np.mean(y)
        
        left_indices = X[:, best_split['feature']] <= best_split['value']
        right_indices = ~left_indices
        
        if np.sum(left_indices) == 0 or np.sum(right_indices) == 0:
            return np.mean(y)
        
        left_tree = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_tree = self._build_tree(X[right_indices], y[right_indices], depth + 1)
        
        return {
            'feature': best_split['feature'],
            'value': best_split['value'],
            'left': left_tree,
            'right': right_tree
        }
    
    def _find_best_split(self, X, y):
        best_split = None
        best_mse = float('inf')
        num_features = X.shape[1]

        for feature in range(num_features):
            unique_values = np.unique(X[:, feature])
            split_points = np.percentile(unique_values, [25, 50, 75]) if len(unique_values) > 10 else unique_values
            
            for value in split_points:
                left_indices = X[:, feature] <= value
                right_indices = ~left_indices
                
                if np.sum(left_indices) < 2 or np.sum(right_indices) < 2:
                    continue
                
                left_y = y[left_indices]
                right_y = y[right_indices]
                
                mse = (np.var(left_y) * len(left_y) + np.var(right_y) * len(right_y)) / len(y)
                
                if mse < best_mse:
                    best_split = {'feature': feature, 'value': value}
                    best_mse = mse
        
        return best_split

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
            
        return np.array([self._predict(sample, self.tree_) for sample in X])
    
    def _predict(self, sample, tree):
        if not isinstance(tree, dict):
            return tree
        
        if sample[tree['feature']] <= tree['value']:
            return self._predict(sample, tree['left'])
        return self._predict(sample, tree['right'])


class RandomForest:
    def __init__(self, n_estimators=100, max_depth=None, max_features='sqrt'):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.trees = []
        self.feature_indices = []

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, (pd.Series, pd.DataFrame)):
            y = y.values
            
        n_features = X.shape[1]
        max_feats = int(np.sqrt(n_features)) if self.max_features == 'sqrt' else self.max_features
        
        for _ in range(self.n_estimators):
            X_sample, y_sample = resample(X, y)
            feature_idx = np.random.choice(n_features, max_feats, replace=False)
            X_sub = X_sample[:, feature_idx]
            
            tree = DecisionTree(max_depth=self.max_depth)
            tree.fit(X_sub, y_sample)
            
            self.trees.append(tree)
            self.feature_indices.append(feature_idx)
    
    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
            
        all_preds = np.zeros((self.n_estimators, X.shape[0]))
        
        for i, (tree, feat_idx) in enumerate(zip(self.trees, self.feature_indices)):
            X_sub = X[:, feat_idx]
            all_preds[i] = tree.predict(X_sub)
            
        return np.mean(all_preds, axis=0)


# ====================== UPDATED KNN WITH PRICE PRIORITY ======================

class CustomKNN:
    def __init__(self, k=5, metric='cosine', price_weight=2.0):
        self.k = k
        self.metric = metric
        self.price_weight = price_weight
        self.X_train = None
        self.y_train = None
        
    def _cosine_similarity(self, a, b):
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _euclidean_distance(self, a, b):
        return np.sqrt(np.sum((a - b) ** 2))
    
    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
        
        prices = np.array(y).reshape(-1, 1)
        self.X_train = np.hstack((X, prices * self.price_weight))
        self.y_train = np.array(y)
    
    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if issparse(X):
            X = X.toarray()
        
        predictions = []
        avg_price = np.mean(self.y_train)
        
        for sample in X:
            sample_with_price = np.hstack((sample, avg_price * self.price_weight))
            
            if self.metric == 'cosine':
                distances = [self._cosine_similarity(sample_with_price, x) for x in self.X_train]
                neighbors = np.argpartition(distances, -self.k)[-self.k:]
            else:
                distances = [self._euclidean_distance(sample_with_price, x) for x in self.X_train]
                neighbors = np.argpartition(distances, self.k)[:self.k]
            
            prediction = np.mean(self.y_train[neighbors])
            predictions.append(prediction)
        
        return np.array(predictions)
    
    def recommend(self, sample, n_recommendations=5):
        avg_price = np.mean(self.y_train)
        sample_with_price = np.hstack((sample, avg_price * self.price_weight))
        
        if self.metric == 'cosine':
            similarities = [self._cosine_similarity(sample_with_price, x) for x in self.X_train]
            top_indices = np.argsort(similarities)[::-1][:n_recommendations]
        else:
            distances = [self._euclidean_distance(sample_with_price, x) for x in self.X_train]
            top_indices = np.argsort(distances)[:n_recommendations]
        
        return top_indices


# ====================== DATA PIPELINE ======================

print("Loading and preprocessing data...")
df = pd.read_csv('laptop_data.csv')
df.drop(columns=["Unnamed: 0"], inplace=True)

df["Ram"] = df["Ram"].str.replace("GB", "").astype("int")
df["Weight"] = df["Weight"].str.replace("kg", "").astype("float")

df["Touchscreen"] = df["ScreenResolution"].apply(lambda x: 1 if "Touchscreen" in x else 0)
df["Ips"] = df["ScreenResolution"].apply(lambda x: 1 if "IPS" in x else 0)

temp = df["ScreenResolution"].str.split("x", n=1, expand=True)
df["X_res"] = temp[0].str.replace(',', '').str.findall(r'(\d+\.?\d+)').apply(lambda x: x[0]).astype(int)
df["Y_res"] = temp[1].astype(int)
df['ppi'] = (((df['X_res']**2) + (df['Y_res']**2))**0.5/df['Inches']).astype('float')
df.drop(columns=["ScreenResolution", "X_res", "Y_res", "Inches"], inplace=True)

df['Cpu Name'] = df['Cpu'].apply(lambda x: " ".join(x.split()[0:3]))
def fetch_processor(text):
    if text in ['Intel Core i7', 'Intel Core i5', 'Intel Core i3']:
        return text
    elif text.split()[0] == 'Intel':
        return 'Other Intel Processor'
    else:
        return 'AMD Processor'
df['Cpu brand'] = df['Cpu Name'].apply(fetch_processor)
df.drop(columns=['Cpu', 'Cpu Name'], inplace=True)

df['Memory'] = df['Memory'].astype(str).replace(r'\.0', '', regex=True)
df["Memory"] = df["Memory"].str.replace('GB', '').str.replace('TB', '000')
new = df["Memory"].str.split("+", n=1, expand=True)
df["first"] = new[0].str.strip().str.replace(r'\D', '', regex=True).astype(int)
df["second"] = new[1].fillna("0").str.replace(r'\D', '', regex=True).astype(int)
df["HDD"] = (df["first"] * df["first"].apply(lambda x: 1 if "HDD" in str(x) else 0)) + \
            (df["second"] * df["second"].apply(lambda x: 1 if "HDD" in str(x) else 0))
df["SSD"] = (df["first"] * df["first"].apply(lambda x: 1 if "SSD" in str(x) else 0)) + \
            (df["second"] * df["second"].apply(lambda x: 1 if "SSD" in str(x) else 0))
df.drop(columns=['first', 'second', 'Memory'], inplace=True)

df['Gpu brand'] = df['Gpu'].apply(lambda x: x.split()[0])
df = df[df['Gpu brand'] != 'ARM']
df.drop(columns=['Gpu'], inplace=True)

def cat_os(inp):
    if inp in ['Windows 10', 'Windows 7', 'Windows 10 S']:
        return 'Windows'
    elif inp in ['macOS', 'Mac OS X']:
        return 'Mac'
    else:
        return 'Others/No OS/Linux'
df['os'] = df['OpSys'].apply(cat_os)
df.drop(columns=['OpSys'], inplace=True)

X = df.drop(columns=['Price'])
y = np.log(df['Price'])

cat_cols = ['Company', 'TypeName', 'Cpu brand', 'Gpu brand', 'os']
num_cols = ['Ram', 'Weight', 'Touchscreen', 'Ips', 'ppi', 'HDD', 'SSD']

preprocessor = ColumnTransformer([
    ('num', StandardScaler(), num_cols),
    ('cat', OneHotEncoder(handle_unknown='ignore'), cat_cols)
])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train_transformed = preprocessor.fit_transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

if issparse(X_train_transformed):
    X_train_transformed = X_train_transformed.toarray()
if issparse(X_test_transformed):
    X_test_transformed = X_test_transformed.toarray()

# ====================== MODEL TRAINING ======================

print("\nTraining custom Random Forest...")
rf_model = RandomForest(n_estimators=100, max_depth=10, max_features='sqrt')
rf_model.fit(X_train_transformed, y_train)

y_pred_rf = rf_model.predict(X_test_transformed)
print("\nRandom Forest Performance:")
print(f"MSE: {mean_squared_error(y_test, y_pred_rf):.4f}")
print(f"MAE: {mean_absolute_error(y_test, y_pred_rf):.4f}")
print(f"R² Score: {r2_score(y_test, y_pred_rf):.4f}")

print("\nTraining custom KNN (Price Priority)...")
knn_model = CustomKNN(k=5, metric='cosine', price_weight=3.0)
knn_model.fit(X_train_transformed, y_train)

y_pred_knn = knn_model.predict(X_test_transformed)
print("\nKNN Performance:")
print(f"MSE: {mean_squared_error(y_test, y_pred_knn):.4f}")
print(f"MAE: {mean_absolute_error(y_test, y_pred_knn):.4f}")
print(f"R² Score: {r2_score(y_test, y_pred_knn):.4f}")

# ====================== SAVE MODELS ======================

print("\nSaving models...")
joblib.dump({
    'df': df,
    'preprocessor': preprocessor,
    'random_forest': rf_model,
    'knn': knn_model
}, 'laptop_models_full_custom_2.pkl')

print("Saved successfully to laptop_models_full_custom_2.pkl ✅")


Loading and preprocessing data...

Training custom Random Forest...

Random Forest Performance:
MSE: 0.1512
MAE: 0.3163
R² Score: 0.6076

Training custom KNN (Price Priority)...

KNN Performance:
MSE: 0.0800
MAE: 0.2106
R² Score: 0.7925

Saving models...
Saved successfully to laptop_models_full_custom_2.pkl ✅
