In [48]:
import pandas as pd
import numpy as np
import pickle
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Dict, Tuple, List, Optional, Any
from pathlib import Path

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, LogisticRegression
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.ensemble import HistGradientBoostingRegressor, HistGradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (mean_squared_error, r2_score, accuracy_score,
                              precision_score, recall_score, f1_score,
                              confusion_matrix, roc_auc_score)
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.combine import SMOTETomek

In [49]:
# Configuration for the pipeline
@dataclass
class Config:
    test_size: float = 0.20
    random_state: int = 42
    classification_threshold: float = 0.75
    cv_folds: int = 5

    # Model hyperparameters
    rf_n_estimators: int = 100
    rf_max_depth: int = 15
    rf_min_samples_leaf: int = 5
    gb_learning_rate: float = 0.05
    gb_max_depth: int = 5
    gb_max_iter: int = 200

In [50]:
# Data processing
def prepare_features(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
    cols_to_drop = [c for c in ['shares', 'url'] if c in df.columns]
    X = df.drop(columns=cols_to_drop, errors='ignore').copy()
    y = df['shares'].copy()

    if X.isnull().sum().sum() > 0:
        X = X.fillna(X.median())

    nunique = X.nunique()
    constant_cols = nunique[nunique == 1].index.tolist()
    if constant_cols:
        X = X.drop(columns=constant_cols)

    potential_categorical = [
        col for col in X.columns
        if X[col].nunique() < 10 and X[col].dtype in ['int64', 'float64']
    ]

    if potential_categorical:
        X = pd.get_dummies(X, columns=potential_categorical, drop_first=True)
        X = X.astype(float)

    if np.isinf(X.values).any():
        X = X.replace([np.inf, -np.inf], np.nan).fillna(X.median())

    return X, y


def remove_correlated_features(X: pd.DataFrame, threshold: float = 0.95) -> pd.DataFrame:
    if X.shape[1] >= 100:
        return X

    correlation_matrix = X.corr().abs()
    upper_triangle = correlation_matrix.where(
        np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool)
    )

    to_drop = [col for col in upper_triangle.columns
               if any(upper_triangle[col] > threshold)]

    return X.drop(columns=to_drop)


def create_targets(y: pd.Series, threshold_percentile: float = 0.75) -> Tuple[pd.Series, pd.Series]:
    y_reg = np.log1p(y)
    thresh_val = y.quantile(threshold_percentile)
    y_class = (y > thresh_val).astype(int)

    return y_reg, y_class


def safe_qcut_bins(y_series: pd.Series,
                   q_candidates: Tuple[int, ...] = (10, 8, 6, 5, 4),
                   min_bin_size: int = 30) -> Optional[pd.Series]:
    y_series = pd.Series(y_series).astype(float)

    for q in q_candidates:
        try:
            bins = pd.qcut(y_series, q=q, duplicates='drop')
            counts = bins.value_counts(dropna=False)
            if counts.min() >= min_bin_size and counts.size >= 2:
                return bins
        except ValueError:
            continue

    return None


def split_dataset(X: pd.DataFrame, y_reg: pd.Series, y_class: pd.Series,
                  test_size: float, random_state: int) -> Tuple:
    y_bins = safe_qcut_bins(y_reg)
    stratify_param = y_bins if y_bins is not None else None

    return train_test_split(
        X, y_reg, y_class,
        test_size=test_size,
        random_state=random_state,
        stratify=stratify_param
    )

In [51]:
# Model building
class ModelFactory:

    def __init__(self, config: Config):
        self.config = config

    def create_classification_models(self, class_weight: Optional[Dict] = None) -> Dict[str, Any]:
        return {
            'Logistic Regression': Pipeline([
                ('scaler', StandardScaler()),
                ('lr', LogisticRegression(
                    C=1.0,
                    max_iter=1000,
                    class_weight=class_weight,
                    random_state=self.config.random_state
                ))
            ]),
            'Random Forest': RandomForestClassifier(
                n_estimators=self.config.rf_n_estimators,
                max_depth=self.config.rf_max_depth,
                min_samples_leaf=self.config.rf_min_samples_leaf,
                min_samples_split=10,
                max_features='sqrt',
                class_weight=class_weight or 'balanced',
                n_jobs=-1,
                random_state=self.config.random_state
            ),
            'Gradient Boosting': HistGradientBoostingClassifier(
                learning_rate=self.config.gb_learning_rate,
                max_depth=self.config.gb_max_depth,
                max_bins=255,
                l2_regularization=1.0,
                max_iter=self.config.gb_max_iter,
                early_stopping=True,
                validation_fraction=0.1,
                n_iter_no_change=20,
                random_state=self.config.random_state
            )
        }


def train_models(models: Dict[str, Any], X_train, y_train) -> Dict[str, Any]:
    trained = {}
    for name, model in models.items():
        print(f"Training {name}...")
        model.fit(X_train, y_train)
        trained[name] = model
    return trained

In [52]:
# Evaluation
def evaluate_classification(model, X_test, y_test) -> Dict[str, float]:
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    results = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred, zero_division=0),
        'recall': recall_score(y_test, y_pred, zero_division=0),
        'f1': f1_score(y_test, y_pred, zero_division=0),
    }

    try:
        results['auc'] = roc_auc_score(y_test, y_pred_proba)
    except:
        results['auc'] = None

    return results


def find_best_threshold(model, X_test, y_test,
                        thresholds: List[float] = [0.3, 0.35, 0.4, 0.45, 0.5]) -> Tuple[float, Dict]:
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    best_f1 = 0
    best_threshold = 0.5
    best_results = {}

    for threshold in thresholds:
        y_pred = (y_pred_proba >= threshold).astype(int)
        f1 = f1_score(y_test, y_pred)

        if f1 > best_f1:
            best_f1 = f1
            best_threshold = threshold
            best_results = {
                'accuracy': accuracy_score(y_test, y_pred),
                'precision': precision_score(y_test, y_pred),
                'recall': recall_score(y_test, y_pred),
                'f1': f1,
                'threshold': best_threshold
            }

    return best_threshold, best_results

In [53]:
# Imbalance handling
def apply_smote(X_train, y_train, random_state: int) -> Tuple:
    smote = SMOTE(random_state=random_state)
    return smote.fit_resample(X_train, y_train)


def apply_undersampling(X_train, y_train, random_state: int) -> Tuple:
    rus = RandomUnderSampler(random_state=random_state)
    return rus.fit_resample(X_train, y_train)


def calculate_class_weights(y_train) -> Dict[int, float]:
    class_counts = np.bincount(y_train)
    total = len(y_train)
    return {
        0: total / (2 * class_counts[0]),
        1: total / (2 * class_counts[1])
    }

In [54]:
# Model persistence
def save_model_to_downloads(model, feature_names: List[str], metadata: Dict):

    # File names
    model_filename = 'online_news_model.pkl'
    metadata_filename = 'model_metadata.pkl'

    # Save model
    print(f"\nSaving model to {model_filename}...")
    with open(model_filename, 'wb') as f:
        pickle.dump(model, f)

    # Save metadata with feature names
    full_metadata = {
        'feature_names': feature_names,
        'metadata': metadata,
        'saved_at': datetime.now().isoformat()
    }

    print(f"Saving metadata to {metadata_filename}...")
    with open(metadata_filename, 'wb') as f:
        pickle.dump(full_metadata, f)

    print("\n" + "=" * 60)
    print("Model Saved Succesfully")
    print("=" * 60)

    # Check if running in Colab
    try:
        from google.colab import files
        print("\ndownloading files to your computer...")
        files.download(model_filename)
        files.download(metadata_filename)
        print("Files downloaded to your Downloads folder!")
    except ImportError:
        # Not in Colab - just save locally
        print(f"Model saved to: {Path.cwd() / model_filename}")
        print(f"Metadata saved to: {Path.cwd() / metadata_filename}")

    return model_filename, metadata_filename

In [55]:
# Main pipeline
class MLPipeline:

    def __init__(self, config: Config):
        self.config = config
        self.factory = ModelFactory(config)
        self.best_model = None
        self.best_model_name = None
        self.feature_names = None
        self.metadata = {}

    def run_baseline(self, df: pd.DataFrame) -> Dict:
        print("=" * 60)
        print("PREPARING DATA")
        print("=" * 60)

        X, y = prepare_features(df)
        X = remove_correlated_features(X)
        y_reg, y_class = create_targets(y, self.config.classification_threshold)

        X_train, X_test, y_reg_train, y_reg_test, y_class_train, y_class_test = split_dataset(
            X, y_reg, y_class, self.config.test_size, self.config.random_state
        )

        print(f"Training set: {len(X_train)} samples")
        print(f"Test set: {len(X_test)} samples")
        print(f"Features: {X.shape[1]}")

        print("\n" + "=" * 60)
        print("TRAINING BASELINE MODELS")
        print("=" * 60)

        clf_models = self.factory.create_classification_models()
        trained_clf = train_models(clf_models, X_train, y_class_train)

        clf_results = {}
        for name, model in trained_clf.items():
            clf_results[name] = evaluate_classification(model, X_test, y_class_test)

        print("\n" + "=" * 60)
        print("BASELINE RESULTS")
        print("=" * 60)
        for name, metrics in sorted(clf_results.items(), key=lambda x: x[1]['f1'], reverse=True):
            print(f"\n{name}:")
            print(f"  F1: {metrics['f1']:.4f} | Recall: {metrics['recall']:.4f} | Precision: {metrics['precision']:.4f}")

        best_name = max(clf_results.items(), key=lambda x: x[1]['f1'])[0]
        self.feature_names = X.columns.tolist()

        return {
            'X_train': X_train, 'X_test': X_test,
            'y_class_train': y_class_train, 'y_class_test': y_class_test,
            'clf_results': clf_results,
            'best_model_name': best_name,
            'best_model': trained_clf[best_name],
            'trained_models': trained_clf,
            'feature_names': self.feature_names
        }

    def run_with_imbalance_handling(self, X_train, y_train, X_test, y_test) -> Dict:
        print("\n" + "=" * 60)
        print("TRAINING WITH IMBALANCE HANDLING")
        print("=" * 60)

        all_results = {}
        all_models = {}

        # SMOTE
        print("\n[1/4] SMOTE...")
        X_smote, y_smote = apply_smote(X_train, y_train, self.config.random_state)
        smote_models = self.factory.create_classification_models()
        trained_smote = train_models(smote_models, X_smote, y_smote)

        for name, model in trained_smote.items():
            result_name = f'{name} (SMOTE)'
            all_results[result_name] = evaluate_classification(model, X_test, y_test)
            all_models[result_name] = model

        # Undersampling
        print("\n[2/4] Undersampling...")
        X_under, y_under = apply_undersampling(X_train, y_train, self.config.random_state)
        under_models = self.factory.create_classification_models()
        trained_under = train_models(under_models, X_under, y_under)

        for name, model in trained_under.items():
            result_name = f'{name} (Undersample)'
            all_results[result_name] = evaluate_classification(model, X_test, y_test)
            all_models[result_name] = model

        # Custom weights
        print("\n[3/4] Custom Weights...")
        class_weights = calculate_class_weights(y_train)
        weight_models = self.factory.create_classification_models(class_weights)
        trained_weights = train_models(weight_models, X_train, y_train)

        for name, model in trained_weights.items():
            result_name = f'{name} (Weighted)'
            all_results[result_name] = evaluate_classification(model, X_test, y_test)
            all_models[result_name] = model

        # Threshold tuning
        print("\n[4/4] Threshold Tuning...")
        rf_model = RandomForestClassifier(
            n_estimators=self.config.rf_n_estimators,
            max_depth=self.config.rf_max_depth,
            class_weight='balanced',
            random_state=self.config.random_state,
            n_jobs=-1
        )
        rf_model.fit(X_train, y_train)
        best_threshold, threshold_results = find_best_threshold(rf_model, X_test, y_test)
        result_name = 'Random Forest (Threshold Tuned)'
        all_results[result_name] = threshold_results
        all_models[result_name] = rf_model

        print("\n" + "=" * 60)
        print("IMBALANCE HANDLING RESULTS")
        print("=" * 60)
        for name, metrics in sorted(all_results.items(), key=lambda x: x[1]['f1'], reverse=True):
            print(f"\n{name}:")
            print(f"  F1: {metrics['f1']:.4f} | Recall: {metrics['recall']:.4f} | Precision: {metrics['precision']:.4f}")

        return {'results': all_results, 'models': all_models}

    def train_and_save(self, data_path: str):
        print("=" * 60)
        print("ML PIPELINE - ONLINE NEWS POPULARITY")
        print("=" * 60)

        # Load data
        df = pd.read_csv(data_path)
        print(f"\nDataset: {df.shape[0]} rows, {df.shape[1]} columns")

        # Run baseline
        baseline = self.run_baseline(df)

        # Run with imbalance handling
        improved = self.run_with_imbalance_handling(
            baseline['X_train'],
            baseline['y_class_train'],
            baseline['X_test'],
            baseline['y_class_test']
        )

        # Select best model
        print("\n" + "=" * 60)
        print("FINAL MODEL SELECTION")
        print("=" * 60)

        baseline_f1 = baseline['clf_results'][baseline['best_model_name']]['f1']
        best_improved_name = max(improved['results'].items(), key=lambda x: x[1]['f1'])[0]
        improved_f1 = improved['results'][best_improved_name]['f1']

        print(f"\nBaseline: {baseline['best_model_name']} - F1: {baseline_f1:.4f}")
        print(f"Improved: {best_improved_name} - F1: {improved_f1:.4f}")

        if improved_f1 > baseline_f1:
            self.best_model = improved['models'][best_improved_name]
            self.best_model_name = best_improved_name
            best_metrics = improved['results'][best_improved_name]
            print(f"\nUsing improved model: {best_improved_name}")
        else:
            self.best_model = baseline['best_model']
            self.best_model_name = baseline['best_model_name']
            best_metrics = baseline['clf_results'][baseline['best_model_name']]
            print(f"\nUsing baseline model: {baseline['best_model_name']}")

        # Prepare metadata
        self.metadata = {
            'model_name': self.best_model_name,
            'f1_score': best_metrics['f1'],
            'recall': best_metrics['recall'],
            'precision': best_metrics['precision'],
            'accuracy': best_metrics['accuracy'],
            'classification_threshold': self.config.classification_threshold,
            'feature_count': len(self.feature_names),
            'training_samples': len(baseline['X_train']),
            'test_samples': len(baseline['X_test']),
            'config': asdict(self.config)
        }

        # Save to Downloads folder
        save_model_to_downloads(
            self.best_model,
            self.feature_names,
            self.metadata
        )

        print("\n" + "=" * 60)
        print("Training complete")
        print("=" * 60)
        print(f"\nBest Model: {self.best_model_name}")
        print(f"F1-Score: {best_metrics['f1']:.4f}")
        print(f"Recall: {best_metrics['recall']:.4f}")
        print(f"Precision: {best_metrics['precision']:.4f}")
        print(f"Accuracy: {best_metrics['accuracy']:.4f}")

        return self.best_model, self.metadata

In [56]:
# Main execution
def main():
    config = Config()
    pipeline = MLPipeline(config)
    model, metadata = pipeline.train_and_save('online_news_original.csv')
    return model, metadata


if __name__ == "__main__":
    main()

ML PIPELINE - ONLINE NEWS POPULARITY

Dataset: 39644 rows, 61 columns
PREPARING DATA
Training set: 31715 samples
Test set: 7929 samples
Features: 57

TRAINING BASELINE MODELS
Training Logistic Regression...
Training Random Forest...
Training Gradient Boosting...

BASELINE RESULTS

Random Forest:
  F1: 0.4358 | Recall: 0.4220 | Precision: 0.4506

Gradient Boosting:
  F1: 0.2282 | Recall: 0.1420 | Precision: 0.5796

Logistic Regression:
  F1: 0.1653 | Recall: 0.0978 | Precision: 0.5326

TRAINING WITH IMBALANCE HANDLING

[1/4] SMOTE...
Training Logistic Regression...
Training Random Forest...
Training Gradient Boosting...

[2/4] Undersampling...
Training Logistic Regression...
Training Random Forest...
Training Gradient Boosting...

[3/4] Custom Weights...
Training Logistic Regression...
Training Random Forest...
Training Gradient Boosting...

[4/4] Threshold Tuning...

IMBALANCE HANDLING RESULTS

Random Forest (Undersample):
  F1: 0.4826 | Recall: 0.6764 | Precision: 0.3751

Gradient Boo

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Files downloaded to your Downloads folder!

Training complete

Best Model: Random Forest (Undersample)
F1-Score: 0.4826
Recall: 0.6764
Precision: 0.3751
Accuracy: 0.6484
