In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_percentage_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns

# Load the dataset
df = pd.read_csv('../data/Initial Engagement v1 - _WITH_videos_AS_get_the_videos_.csv')

# Display basic information
print(f"Dataset shape: {df.shape}")
print(df.columns.tolist())


Dataset shape: (18468, 18)
['video_id', 'video_published_timestamp', 'channel_id', 'channel_name', 'creator_type_id', 'video_length', 'views_final', 'impressions_final', 'like_final', 'comment_final', 'views3s_1k', 'impressions_1k', 'like_1k', 'comments_1k', 'views3s_5k', 'impressions_5k', 'like_5k', 'comments_5k']


In [None]:
# Feature engineering
def engineer_features(df):
    # Create a copy to avoid modifying the original dataframe
    df_features = df.copy()

    # 1. Engagement rates at 1K impressions
    df_features['ctr_1k'] = df_features['views3s_1k'] / df_features['impressions_1k']
    df_features['like_per_view_1k'] = df_features['like_1k'] / df_features['views3s_1k'].replace(0, np.nan)
    df_features['comment_per_view_1k'] = df_features['comments_1k'] / df_features['views3s_1k'].replace(0, np.nan)

    # 2. Engagement rates at 5K impressions (if available)
    df_features['has_5k_data'] = (~df_features['views3s_5k'].isna()).astype(int)

    # Only calculate these features for videos that have 5K impression data
    mask_5k = df_features['has_5k_data'] == 1
    df_features.loc[mask_5k, 'ctr_5k'] = df_features.loc[mask_5k, 'views3s_5k'] / df_features.loc[mask_5k, 'impressions_5k']
    df_features.loc[mask_5k, 'like_per_view_5k'] = df_features.loc[mask_5k, 'like_5k'] / df_features.loc[mask_5k, 'views3s_5k'].replace(0, np.nan)
    df_features.loc[mask_5k, 'comment_per_view_5k'] = df_features.loc[mask_5k, 'comments_5k'] / df_features.loc[mask_5k, 'views3s_5k'].replace(0, np.nan)

    # 3. Growth metrics between 1K and 5K impressions (for videos with both datapoints)
    df_features.loc[mask_5k, 'view_growth_rate'] = (df_features.loc[mask_5k, 'views3s_5k'] - df_features.loc[mask_5k, 'views3s_1k']) / df_features.loc[mask_5k, 'views3s_1k']
    df_features.loc[mask_5k, 'impression_growth_rate'] = (df_features.loc[mask_5k, 'impressions_5k'] - df_features.loc[mask_5k, 'impressions_1k']) / df_features.loc[mask_5k, 'impressions_1k']

    # 4. Categorical features encoding (if available)
    if 'creator_type_id' in df_features.columns:
        df_features = pd.get_dummies(df_features, columns=['creator_type_id'], drop_first=True)

    # 5. Video length features
    if 'video_length' in df_features.columns:
        # Bin video length into categories
        df_features['video_length_bin'] = pd.cut(df_features['video_length'],
                                                bins=[0, 60, 180, 300, 600, np.inf],
                                                labels=['very_short', 'short', 'medium', 'long', 'very_long'])
        df_features = pd.get_dummies(df_features, columns=['video_length_bin'], drop_first=True)

    # Fill NaN values with appropriate values
    df_features = df_features.fillna(0)

    return df_features


In [None]:
def estimate_lifetime_class(df_features):
    """
    Estimate the lifetime class of videos based on engagement patterns.
    This is an adaptation of the α-lifespan concept from LARM.
    """
    # For videos with both 1K and 5K data, use growth patterns
    mask_5k = df_features['has_5k_data'] == 1

    # Initialize lifetime class column
    df_features['lifetime_class'] = 0

    # For videos with both datapoints, classify based on view growth rate
    df_features.loc[mask_5k, 'lifetime_class'] = pd.qcut(
        df_features.loc[mask_5k, 'view_growth_rate'],
        q=4,
        labels=[1, 2, 3, 4]
    ).astype(int)

    # For videos with only 1K data, use engagement metrics to estimate
    mask_1k_only = df_features['has_5k_data'] == 0

    # Use a simple model to predict lifetime class for videos with only 1K data
    if mask_1k_only.sum() > 0:
        X_train = df_features.loc[mask_5k, ['ctr_1k', 'like_per_view_1k', 'comment_per_view_1k']]
        y_train = df_features.loc[mask_5k, 'lifetime_class']

        # Train a simple model
        from sklearn.ensemble import RandomForestClassifier
        clf = RandomForestClassifier(n_estimators=50, random_state=42)
        clf.fit(X_train, y_train)

        # Predict lifetime class for videos with only 1K data
        X_pred = df_features.loc[mask_1k_only, ['ctr_1k', 'like_per_view_1k', 'comment_per_view_1k']]
        df_features.loc[mask_1k_only, 'lifetime_class'] = clf.predict(X_pred)

    return df_features


In [None]:
def train_specialized_models(df_features):
    """
    Train specialized models for different lifetime classes.
    """
    # Split data into training and testing sets
    X = df_features.drop(['video_id', 'views_final', 'lifetime_class'], axis=1)
    y = df_features['views_final']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Get lifetime classes in the training data
    lifetime_classes = df_features.loc[X_train.index, 'lifetime_class'].unique()

    # Train specialized models for each lifetime class
    specialized_models = {}
    for lc in lifetime_classes:
        # Get indices for this lifetime class
        indices = df_features.loc[X_train.index, 'lifetime_class'] == lc

        # Train a model on this subset
        model = GradientBoostingRegressor(n_estimators=100, random_state=42)
        model.fit(X_train[indices], y_train[indices])

        # Store the model
        specialized_models[lc] = model

    # Train a fallback model on all data
    fallback_model = GradientBoostingRegressor(n_estimators=100, random_state=42)
    fallback_model.fit(X_train, y_train)
    specialized_models['fallback'] = fallback_model

    return specialized_models, X_test, y_test


In [None]:
def predict_final_views(df_new, specialized_models):
    """
    Predict final views for new videos using the appropriate specialized model.
    """
    # Engineer features for the new data
    df_features = engineer_features(df_new)

    # Estimate lifetime class
    df_features = estimate_lifetime_class(df_features)

    # Prepare features for prediction
    X = df_features.drop(['video_id', 'lifetime_class'], axis=1, errors='ignore')

    # Initialize predictions array
    predictions = np.zeros(len(df_features))

    # Use specialized models based on lifetime class
    for i, row in df_features.iterrows():
        lc = int(row['lifetime_class'])

        # Use the appropriate model or fallback to the general model
        if lc in specialized_models:
            model = specialized_models[lc]
        else:
            model = specialized_models['fallback']

        # Make prediction
        predictions[i] = model.predict([X.iloc[i]])[0]

    return predictions


In [None]:
def evaluate_model(specialized_models, X_test, y_test, df_features):
    """
    Evaluate the performance of our specialized models.
    """
    # Get lifetime classes for test data
    test_indices = X_test.index
    lifetime_classes = df_features.loc[test_indices, 'lifetime_class']

    # Initialize predictions array
    predictions = np.zeros(len(X_test))

    # Use specialized models based on lifetime class
    for i, (idx, row) in enumerate(X_test.iterrows()):
        lc = int(lifetime_classes.loc[idx])

        # Use the appropriate model or fallback to the general model
        if lc in specialized_models:
            model = specialized_models[lc]
        else:
            model = specialized_models['fallback']

        # Make prediction
        predictions[i] = model.predict([row])[0]

    # Calculate metrics
    mape = mean_absolute_percentage_error(y_test, predictions)
    r2 = r2_score(y_test, predictions)

    print(f"Mean Absolute Percentage Error (MAPE): {mape:.4f}")
    print(f"R² Score: {r2:.4f}")

    # Plot actual vs predicted
    plt.figure(figsize=(10, 6))
    plt.scatter(y_test, predictions, alpha=0.5)
    plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
    plt.xlabel('Actual Views')
    plt.ylabel('Predicted Views')
    plt.title('Actual vs Predicted Views')
    plt.show()

    return mape, r2


In [None]:
def analyze_feature_importance(specialized_models, X):
    """
    Analyze feature importance across all specialized models.
    """
    # Get feature names
    feature_names = X.columns

    # Initialize feature importance dictionary
    feature_importance = {feature: 0 for feature in feature_names}

    # Sum feature importance across all models (excluding fallback)
    for lc, model in specialized_models.items():
        if lc != 'fallback':
            for feature, importance in zip(feature_names, model.feature_importances_):
                feature_importance[feature] += importance

    # Normalize by number of models
    num_models = len(specialized_models) - 1  # Exclude fallback
    feature_importance = {feature: importance / num_models for feature, importance in feature_importance.items()}

    # Sort features by importance
    sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)

    # Plot feature importance
    plt.figure(figsize=(12, 8))
    features, importances = zip(*sorted_features[:15])  # Top 15 features
    plt.barh(features, importances)
    plt.xlabel('Average Feature Importance')
    plt.title('Top 15 Features by Importance')
    plt.tight_layout()
    plt.show()

    return sorted_features


In [None]:
def run_larm_pipeline(df):
    """
    Run the complete LARM pipeline.
    """
    # Engineer features
    print("Engineering features...")
    df_features = engineer_features(df)

    # Estimate lifetime class
    print("Estimating lifetime classes...")
    df_features = estimate_lifetime_class(df_features)

    # Train specialized models
    print("Training specialized models...")
    specialized_models, X_test, y_test = train_specialized_models(df_features)

    # Evaluate models
    print("Evaluating models...")
    mape, r2 = evaluate_model(specialized_models, X_test, y_test, df_features)

    # Analyze feature importance
    print("Analyzing feature importance...")
    feature_importance = analyze_feature_importance(specialized_models, X_test)

    return specialized_models, df_features, mape, r2, feature_importance


In [None]:
from sklearn.model_selection import GridSearchCV

def optimize_models(df_features):
    """
    Optimize the hyperparameters for each specialized model.
    """
    # Split data into training and testing sets
    X = df_features.drop(['video_id', 'views_final', 'lifetime_class'], axis=1)
    y = df_features['views_final']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Get lifetime classes in the training data
    lifetime_classes = df_features.loc[X_train.index, 'lifetime_class'].unique()

    # Define parameter grid
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.05, 0.1]
    }

    # Train optimized models for each lifetime class
    optimized_models = {}
    for lc in lifetime_classes:
        # Get indices for this lifetime class
        indices = df_features.loc[X_train.index, 'lifetime_class'] == lc

        # Create base model
        base_model = GradientBoostingRegressor(random_state=42)

        # Perform grid search
        grid_search = GridSearchCV(
            estimator=base_model,
            param_grid=param_grid,
            cv=3,
            n_jobs=-1,
            scoring='neg_mean_absolute_percentage_error'
        )

        # Fit grid search
        grid_search.fit(X_train[indices], y_train[indices])

        # Get best model
        best_model = grid_search.best_estimator_

        # Store the model
        optimized_models[lc] = best_model

        print(f"Best parameters for lifetime class {lc}: {grid_search.best_params_}")

    # Train a fallback model on all data
    base_model = GradientBoostingRegressor(random_state=42)
    grid_search = GridSearchCV(
        estimator=base_model,
        param_grid=param_grid,
        cv=3,
        n_jobs=-1,
        scoring='neg_mean_absolute_percentage_error'
    )
    grid_search.fit(X_train, y_train)
    optimized_models['fallback'] = grid_search.best_estimator_

    print(f"Best parameters for fallback model: {grid_search.best_params_}")

    return optimized_models, X_test, y_test


In [None]:
def predict_with_1k_data_only(df_new, specialized_models):
    """
    Predict final views for videos that only have 1K impression data.
    """
    # Engineer basic features
    df_features = df_new.copy()

    # Calculate engagement rates
    df_features['ctr_1k'] = df_features['views3s_1k'] / df_features['impressions_1k']
    df_features['like_per_view_1k'] = df_features['like_1k'] / df_features['views3s_1k'].replace(0, np.nan)
    df_features['comment_per_view_1k'] = df_features['comments_1k'] / df_features['views3s_1k'].replace(0, np.nan)

    # Fill NaN values
    df_features = df_features.fillna(0)

    # Use a simplified lifetime estimation
    # This could be based on the engagement metrics at 1K impressions
    df_features['lifetime_class'] = pd.qcut(
        df_features['ctr_1k'],
        q=4,
        labels=[1, 2, 3, 4]
    ).astype(int)

    # Prepare features for prediction
    X = df_features[['ctr_1k', 'like_per_view_1k', 'comment_per_view_1k']]

    # Use specialized models based on lifetime class
    predictions = np.zeros(len(df_features))
    for i, row in df_features.iterrows():
        lc = int(row['lifetime_class'])

        # Use the appropriate model or fallback to the general model
        if lc in specialized_models:
            model = specialized_models[lc]
        else:
            model = specialized_models['fallback']

        # Make prediction
        predictions[i] = model.predict([X.iloc[i]])[0]

    return predictions

def predict_with_5k_data(df_new, specialized_models):
    """
    Predict final views for videos that have both 1K and 5K impression data.
    """
    # Engineer features
    df_features = engineer_features(df_new)

    # Estimate lifetime class
    df_features = estimate_lifetime_class(df_features)

    # Prepare features for prediction
    X = df_features.drop(['video_id', 'lifetime_class'], axis=1, errors='ignore')

    # Use specialized models based on lifetime class
    predictions = np.zeros(len(df_features))
    for i, row in df_features.iterrows():
        lc = int(row['lifetime_class'])

        # Use the appropriate model or fallback to the general model
        if lc in specialized_models:
            model = specialized_models[lc]
        else:
            model = specialized_models['fallback']

        # Make prediction
        predictions[i] = model.predict([X.iloc[i]])[0]

    return predictions

def predict_final_views_adaptive(df_new, specialized_models):
    """
    Adaptively predict final views based on available data.
    """
    # Separate videos with only 1K data and those with 5K data
    mask_1k_only = df_new['views3s_5k'].isna()
    df_1k = df_new[mask_1k_only]
    df_5k = df_new[~mask_1k_only]

    # Predict for each group
    predictions_1k = predict_with_1k_data_only(df_1k, specialized_models) if len(df_1k) > 0 else []
    predictions_5k = predict_with_5k_data(df_5k, specialized_models) if len(df_5k) > 0 else []

    # Combine predictions
    predictions = np.concatenate([predictions_1k, predictions_5k])

    return predictions


In [12]:
import joblib

def save_models(specialized_models, filename):
    joblib.dump(specialized_models, filename)

def load_models(filename):
    return joblib.load(filename)

# Save models
save_models(specialized_models, 'larm_models.joblib')

# Load models
loaded_models = load_models('larm_models.joblib')


NameError: name 'specialized_models' is not defined

In [14]:
!pip install flask

zsh:1: /Users/adviti/code/advitis/video-performance-predictor/.venv/bin/pip: bad interpreter: /Users/adviti/code/advitis/larm-video-performance-predictor/.venv/bin/python: no such file or directory


In [15]:
from flask import Flask, request, jsonify
import pandas as pd

app = Flask(__name__)

# Load models
specialized_models = load_models('larm_models.joblib')

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json
    df = pd.DataFrame(data)
    predictions = predict_final_views_adaptive(df, specialized_models)
    return jsonify({'predictions': predictions.tolist()})

if __name__ == '__main__':
    app.run(debug=True)


ModuleNotFoundError: No module named 'flask'

In [None]:
def monitor_model_performance(true_values, predictions, threshold=0.1):
    mape = mean_absolute_percentage_error(true_values, predictions)
    if mape > threshold:
        print(f"Model performance degraded. Current MAPE: {mape:.4f}")
        return False
    return True

def update_models(new_data):
    # Retrain models with new data
    new_specialized_models, _, _ = train_specialized_models(new_data)

    # Save updated models
    save_models(new_specialized_models, 'larm_models_updated.joblib')

    print("Models updated and saved.")

# In production, periodically:
new_data = fetch_new_data()  # Implement this function to get new data
true_values = new_data['views_final']
predictions = predict_final_views_adaptive(new_data, specialized_models)

if not monitor_model_performance(true_values, predictions):
    update_models(new_data)


NameError: name 'fetch_new_data' is not defined