<a href="https://colab.research.google.com/github/Nikhild2710/Game-revenue-predictions/blob/main/Games_Revenue_Predictor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Paid PC Games — Revenue Prediction

This notebook trains a machine learning model to predict the revenue of paid PC games based on publicly available metadata.  


**Key points:**
- Input: Game metadata such as price, reviews, review score, peak concurrent players, and publisher class.  
- Output: Estimated revenue per game.  
- The dataset used for training is private; you can run the notebook with your own dataset following the same column schema.

**Expected Input Columns**:
- `price`
- `total_reviews`
- `total_positive`
- `total_negative`
- `reviewScore`
- `peak_all_time`
- `publisherClass`

**Optional Columns**:
- `firstReleaseDate` (used to compute release year if included)
- `positive_ratio` (computed automatically if not provided)

**Excluded**:
- `copiesSold` — excluded to avoid data leakage.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

## 1) Setup

In [None]:
!pip -q install xgboost

In [None]:

# Install Excel reader
!pip -q install openpyxl



## 2) (Optional) Mount Google Drive

If your file lives in Drive, keep this ON and update the `DRIVE_DATA_PATH` below.
If not, we'll auto-fallback to a manual file upload in the next step.


In [None]:

MOUNT_DRIVE = True  # Set to False if you DON'T use Drive
DRIVE_DATA_PATH = "/content/drive/MyDrive/private/GamesDataSet.xlsx"  # <-- change if needed

if MOUNT_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')



## 3) Load your **private** Excel file (sheet: `paid pc games`)

- If `DRIVE_DATA_PATH` exists, we'll use it.
- Otherwise, you'll be prompted to **upload** your Excel file from your computer.


In [None]:

import pandas as pd

SHEET_NAME = "paid pc games"  # do not change

def load_data():
    # Try Drive path first
    if MOUNT_DRIVE:
        try:
            df = pd.read_excel(DRIVE_DATA_PATH, sheet_name=SHEET_NAME)
            print(f"Loaded from Drive: {DRIVE_DATA_PATH}")
            return df
        except Exception as e:
            print(f"Drive read failed ({e}). Falling back to file upload...")
    # Fallback: upload
    from google.colab import files
    print("Please upload your Excel file now...")
    uploaded = files.upload()
    assert len(uploaded) >= 1, "No file uploaded."
    file_name = list(uploaded.keys())[0]
    df = pd.read_excel(file_name, sheet_name=SHEET_NAME)
    print(f"Loaded uploaded file: {file_name}")
    return df

df_raw = load_data()
df_raw.head(5)  # preview



## 4) Configure training

- We **drop** `copiesSold`.
- We compute `positive_ratio` = `total_positive` / `total_reviews`.
- `release_year` is **OFF** by default (you can enable it later).
- To make training fast, we cap rows at `MAX_ROWS` (set `0` to use all rows).


In [None]:

USE_RELEASE_YEAR = False   # Set True if you want to include release year
MAX_ROWS = 20000           # Training row cap for speed (set 0 to use all rows)

TARGET = "revenue"
NUMERIC_BASE = [
    "price", "total_reviews", "total_positive", "total_negative",
    "reviewScore", "peak_all_time"
]
CATEGORICAL = ["publisherClass"]

DROP_COLUMNS = ["copiesSold"]  # explicitly remove
DATE_COLUMN = "firstReleaseDate"



## 5) Preprocess & Feature Engineering


In [None]:

import numpy as np

def safe_ratio(a, b):
    a = a.astype(float)
    b = b.astype(float).replace(0, np.nan)
    return (a / b).replace([np.inf, -np.inf], np.nan)

df = df_raw.copy()

# Drop unwanted columns
for col in DROP_COLUMNS:
    if col in df.columns:
        df = df.drop(columns=[col])

# Positive ratio
if "positive_ratio" not in df.columns:
    if ("total_positive" in df.columns) and ("total_reviews" in df.columns):
        df["positive_ratio"] = safe_ratio(df["total_positive"], df["total_reviews"])
    else:
        df["positive_ratio"] = np.nan

# Release year (optional)
if USE_RELEASE_YEAR:
    if DATE_COLUMN in df.columns:
        df[DATE_COLUMN] = pd.to_datetime(df[DATE_COLUMN], errors="coerce")
        df["release_year"] = df[DATE_COLUMN].dt.year
    else:
        df["release_year"] = np.nan

# Build feature list
feature_cols = NUMERIC_BASE + ["positive_ratio"] + CATEGORICAL
if USE_RELEASE_YEAR:
    feature_cols += ["release_year"]

# Keep only available columns
feature_cols = [c for c in feature_cols if c in df.columns]

# Drop rows with missing target
df = df[~df[TARGET].isna()].copy()

# Downsample for speed if needed
if MAX_ROWS and MAX_ROWS > 0 and len(df) > MAX_ROWS:
    df = df.sample(n=MAX_ROWS, random_state=42)

print("Rows used for training:", len(df))
print("Features:", feature_cols)
df[feature_cols + [TARGET]].head(5)



## 6) Train & Evaluate (RandomForest)
- Train/test split (80/20)
- Metrics: R², MAE, RMSE, MAPE
- Top feature importances


In [None]:

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt

X = df[feature_cols].copy()
y = df[TARGET].astype(float).copy()

numeric_cols = [c for c in feature_cols if c not in CATEGORICAL]
categorical_cols = [c for c in feature_cols if c in CATEGORICAL]

preprocessor = ColumnTransformer(
    transformers=[
        ("num", SimpleImputer(strategy="median"), numeric_cols),
        ("cat", Pipeline(steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]), categorical_cols)
    ]
)

rf = RandomForestRegressor(
    n_estimators=150, n_jobs=-1, random_state=42
)

pipe = Pipeline(steps=[("prep", preprocessor), ("model", rf)])

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
pipe.fit(X_train, y_train)
preds = pipe.predict(X_test)

import numpy as np  # only needed if not already imported above

mae = mean_absolute_error(y_test, preds)
try:
    rmse = mean_squared_error(y_test, preds, squared=False)  # newer sklearn
except TypeError:
    rmse = np.sqrt(mean_squared_error(y_test, preds))        # older sklearn
r2 = r2_score(y_test, preds)

REVENUE_THRESHOLD = 1000

# Mask for reasonable values
mask = y_test >= REVENUE_THRESHOLD

# Filter y_test and preds
y_test_filtered = y_test[mask]
preds_filtered = preds[mask]

# Calculate safe MAPE
EPSILON = REVENUE_THRESHOLD
mape = np.mean(
    np.abs((y_test_filtered - preds_filtered) / np.maximum(np.abs(y_test_filtered), EPSILON))
) * 100


print(f"R2  : {r2:.3f}")
print(f"MAE : {mae:,.0f}")
print(f"RMSE: {rmse:,.0f}")
print(f"MAPE: {mape:.2f}%")

# Feature importances
# Get processed feature names
ohe = pipe.named_steps["prep"].named_transformers_["cat"].named_steps["onehot"] if categorical_cols else None
cat_feature_names = ohe.get_feature_names_out(categorical_cols).tolist() if ohe is not None else []
feature_names = numeric_cols + cat_feature_names

importances = pipe.named_steps["model"].feature_importances_
imp_series = pd.Series(importances, index=feature_names).sort_values(ascending=False).head(15)

plt.figure(figsize=(8,5))
imp_series[::-1].plot.barh()
plt.title("Top Feature Importances (RandomForest)")
plt.xlabel("Importance")
plt.tight_layout()
plt.show()



## 7) Save trained model (`revenue_model.pkl`)

- Saves locally to `/content/revenue_model.pkl`.
- If Drive is mounted, also saves to `/content/drive/MyDrive/revenue_model.pkl`.


In [None]:
from xgboost import XGBRegressor
from sklearn.pipeline import Pipeline

xgb_raw = XGBRegressor(
    n_estimators=600,
    max_depth=8,
    learning_rate=0.05,
    subsample=0.8,
    colsample_bytree=0.8,
    reg_lambda=1.0,
    random_state=42,
    n_jobs=-1,
    tree_method="hist",
    eval_metric="rmse",
)

xgb_raw_pipe = Pipeline(steps=[("prep", preprocessor), ("model", xgb_raw)])

# (Early stopping handling same as before; keep your try/except if you want)
try:
    from xgboost.callback import EarlyStopping
    xgb_raw_pipe.fit(
        X_train, y_train,
        model__eval_set=[(X_test, y_test)],
        model__callbacks=[EarlyStopping(rounds=50, save_best=True)],
        model__verbose=False,
    )
except Exception:
    try:
        xgb_raw_pipe.fit(
            X_train, y_train,
            model__eval_set=[(X_test, y_test)],
            model__early_stopping_rounds=50,
            model__verbose=False,
        )
    except Exception:
        xgb_raw_pipe.fit(X_train, y_train)

xgb_raw_preds = xgb_raw_pipe.predict(X_test)


In [None]:
def metrics_of(y_true, y_pred, label):
    mae = mean_absolute_error(y_true, y_pred)
    try:
        rmse = mean_squared_error(y_true, y_pred, squared=False)
    except TypeError:
        rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    mape = safe_mape(y_true, y_pred, eps=1000)
    return {"Model": label, "R2": round(r2,3), "MAE": int(mae), "RMSE": int(rmse), "MAPE%": round(mape,2)}

rows = []
# RandomForest (from earlier cell)
rows.append(metrics_of(y_test, preds, "RandomForest"))
# XGBoost
rows.append(metrics_of(y_test, xgb_preds, "XGBoost"))

import pandas as pd
pd.DataFrame(rows)


In [None]:
# === XGBoost (LOG-target) — distinct variable names so we don't overwrite RAW ===
!pip -q install xgboost  # ensures xgboost is present

from xgboost import XGBRegressor
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

def safe_mape(y_true, y_pred, eps=1000):
    return np.mean(np.abs((y_true - y_pred) / np.maximum(np.abs(y_true), eps))) * 100

# 1) Log-transform target
y_train_log = np.log1p(np.clip(y_train, a_min=0, a_max=None))
y_test_log  = np.log1p(np.clip(y_test,  a_min=0, a_max=None))

# 2) Model (LOG) — separate object
xgb_log = XGBRegressor(
    n_estimators=1200,
    max_depth=6,
    learning_rate=0.03,
    subsample=0.8,
    colsample_bytree=0.8,
    reg_lambda=1.0,
    random_state=42,
    n_jobs=-1,
    tree_method="hist",
    eval_metric="rmse",   # evaluated on LOG scale
)

xgb_log_pipe = Pipeline(steps=[("prep", preprocessor), ("model", xgb_log)])

# 3) Fit (version-agnostic early stopping)
try:
    from xgboost.callback import EarlyStopping  # xgboost >= 2.0
    xgb_log_pipe.fit(
        X_train, y_train_log,
        model__eval_set=[(X_test, y_test_log)],
        model__callbacks=[EarlyStopping(rounds=100, save_best=True)],
        model__verbose=False,
    )
except Exception:
    try:
        xgb_log_pipe.fit(
            X_train, y_train_log,
            model__eval_set=[(X_test, y_test_log)],
            model__early_stopping_rounds=100,
            model__verbose=False,
        )
    except Exception:
        xgb_log_pipe.fit(X_train, y_train_log)

# 4) Predict back on $ scale — keep separate name
preds_log = xgb_log_pipe.predict(X_test)
xgb_log_preds = np.expm1(preds_log)

# 5) Metrics (on $ scale)
mae = mean_absolute_error(y_test, xgb_log_preds)
try:
    rmse = mean_squared_error(y_test, xgb_log_preds, squared=False)
except TypeError:
    rmse = np.sqrt(mean_squared_error(y_test, xgb_log_preds))
r2  = r2_score(y_test, xgb_log_preds)

mape = safe_mape(y_test, xgb_log_preds, eps=1000)
mask = y_test >= 1000
mape_filtered = safe_mape(y_test[mask], xgb_log_preds[mask], eps=1000)

print(f"[XGBoost LOG-target]  R2: {r2:.3f}  MAE: {mae:,.0f}  RMSE: {rmse:,.0f}  MAPE*: {mape:.2f}%  MAPE>=1k: {mape_filtered:.2f}%")
print("* MAPE uses eps=1000 to avoid tiny-denominator blowups")

# 6) Store results for later table
xgb_log_results = {
    "Model": "XGBoost (log)",
    "R2": round(r2, 3),
    "MAE": int(mae),
    "RMSE": int(rmse),
    "MAPE%": round(mape, 2),
}


In [None]:
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

def safe_mape(y_true, y_pred, eps=1000):
    """MAPE with floor eps to avoid division blowups on tiny values."""
    return np.mean(np.abs((y_true - y_pred) / np.maximum(np.abs(y_true), eps))) * 100

# === Log transform target ===
y_train_log = np.log1p(y_train)   # log(1+y) to handle 0 safely
y_test_log  = np.log1p(y_test)

rf_log = RandomForestRegressor(
    n_estimators=150,
    random_state=42,
    n_jobs=-1
)

rf_log_pipe = Pipeline(steps=[("prep", preprocessor), ("model", rf_log)])

# Fit on log targets
rf_log_pipe.fit(X_train, y_train_log)

# Predict (log scale), then convert back
log_preds = rf_log_pipe.predict(X_test)
rf_log_preds = np.expm1(log_preds)  # inverse of log1p

# Evaluate
mae  = mean_absolute_error(y_test, rf_log_preds)
try:
    rmse = mean_squared_error(y_test, rf_log_preds, squared=False)
except TypeError:
    rmse = np.sqrt(mean_squared_error(y_test, rf_log_preds))
r2   = r2_score(y_test, rf_log_preds)
mape = safe_mape(y_test, rf_log_preds, eps=1000)

print(f"[RF-log]  R2: {r2:.3f}  MAE: {mae:,.0f}  RMSE: {rmse:,.0f}  MAPE*: {mape:.2f}%")
print("* MAPE uses eps=1000 to avoid tiny-denominator blowups")

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

def safe_mape(y_true, y_pred, eps=1000):
    return np.mean(np.abs((y_true - y_pred) / np.maximum(np.abs(y_true), eps))) * 100

def rmse_of(y_true, y_pred):
    try:
        return mean_squared_error(y_true, y_pred, squared=False)
    except TypeError:
        return np.sqrt(mean_squared_error(y_true, y_pred))

def metrics_of(y_true, y_pred, label):
    return {
        "Model": label,
        "R2": round(r2_score(y_true, y_pred), 3),
        "MAE": int(mean_absolute_error(y_true, y_pred)),
        "RMSE": int(rmse_of(y_true, y_pred)),
        "MAPE%": round(safe_mape(y_true, y_pred, eps=1000), 2),
    }

rows = []
rows.append(metrics_of(y_test, preds,          "RandomForest (raw)"))
rows.append(metrics_of(y_test, rf_log_preds,   "RandomForest (log)"))
rows.append(metrics_of(y_test, xgb_raw_preds,  "XGBoost (raw)"))
rows.append(metrics_of(y_test, xgb_log_preds,  "XGBoost (log)"))

pd.DataFrame(rows)


In [None]:

import joblib, os

MODEL_LOCAL_PATH = "/content/revenue_model.pkl"
joblib.dump(pipe, MODEL_LOCAL_PATH)
print("Saved:", MODEL_LOCAL_PATH)

if MOUNT_DRIVE:
    MODEL_DRIVE_PATH = "/content/drive/MyDrive/revenue_model.pkl"
    joblib.dump(pipe, MODEL_DRIVE_PATH)
    print("Saved:", MODEL_DRIVE_PATH)

# Optional: download to your computer
try:
    from google.colab import files
    files.download(MODEL_LOCAL_PATH)
except Exception as e:
    pass



## 8) Use the model to predict on a **new** file

- Provide a path to another Excel/CSV with the same columns.
- Output will be saved as `predictions.csv` with a `predicted_revenue` column.


In [None]:

import pandas as pd
import numpy as np
import joblib

# === Set your input path here ===
PREDICT_INPUT_PATH = None  # e.g. "/content/drive/MyDrive/my_new_games.xlsx"

def load_any(path):
    if path.lower().endswith(".xlsx") or path.lower().endswith(".xls"):
        return pd.read_excel(path, sheet_name=SHEET_NAME if SHEET_NAME in pd.ExcelFile(path).sheet_names else 0)
    else:
        return pd.read_csv(path)

if PREDICT_INPUT_PATH is None:
    print("Set PREDICT_INPUT_PATH to your new file path and re-run this cell.")
else:
    model = joblib.load(MODEL_LOCAL_PATH)
    dfp = load_any(PREDICT_INPUT_PATH).copy()

    # --- minimal preprocessing to align columns ---
    DROP_COLUMNS = ["copiesSold"]
    for col in DROP_COLUMNS:
        if col in dfp.columns:
            dfp = dfp.drop(columns=[col])

    def safe_ratio(a, b):
        a = a.astype(float)
        b = b.astype(float).replace(0, np.nan)
        return (a / b).replace([np.inf, -np.inf], np.nan)

    if "positive_ratio" not in dfp.columns:
        if ("total_positive" in dfp.columns) and ("total_reviews" in dfp.columns):
            dfp["positive_ratio"] = safe_ratio(dfp["total_positive"], dfp["total_reviews"])
        else:
            dfp["positive_ratio"] = np.nan

    # Only include columns the model was trained with
    feature_cols = [c for c in model.named_steps["prep"].transformers_[0][2] + model.named_steps["prep"].transformers_[1][2]]
    use_cols = [c for c in feature_cols if c in dfp.columns]

    preds = model.predict(dfp[use_cols])
    out = dfp.copy()
    out["predicted_revenue"] = preds

    out_path = "/content/predictions.csv"
    out.to_csv(out_path, index=False)
    print("Saved predictions:", out_path)

    try:
        from google.colab import files
        files.download(out_path)
    except Exception as e:
        pass
