# Gabriel Bertasius & Jaden Ford#

# Predicting Game Success: A Regression Analysis on the Steam Games Dataset #

In [None]:
import numpy as np
import pandas as pd
# show all columns
pd.set_option('display.max_columns', None)

## Downloading and loading data

In [None]:
# load the data into a dataframe for easy handling
import os
from datetime import datetime
import pickle
import gzip
DATASET_DIR = './data/'
DATASET_FILENAME = 'steamgames.parquet'
DATASET_PATH = DATASET_DIR+DATASET_FILENAME
DATASET_COMPRESSION = 'zstd'  # Very fast and compresses as well as gzip
MODELS_DIR = './models/'
MODELS_FILENAME = 'model-'
download_data = 1


def check_file_exists(path: str) -> bool:
    return os.path.exists(path)


def check_data_dir_exists() -> bool:
    return os.path.exists(DATASET_DIR)

def check_models_dir_exists() -> bool:
    return os.path.exists(MODELS_DIR)

def create_data_dir():
    directory_name = DATASET_DIR
    try:
        os.mkdir(directory_name)
        print(f"Directory '{directory_name}' created successfully.")
    except FileExistsError:
        print(f"Directory '{directory_name}' already exists.")
    except PermissionError:
        print(f"Permission denied: Unable to create '{directory_name}'.")
    except Exception as e:
        print(f"An error occurred: {e}")

def create_models_dir():
    directory_name = MODELS_DIR
    try:
        os.mkdir(directory_name)
        print(f"Directory '{directory_name}' created successfully.")
    except FileExistsError:
        print(f"Directory '{directory_name}' already exists.")
    except PermissionError:
        print(f"Permission denied: Unable to create '{directory_name}'.")
    except Exception as e:
        print(f"An error occurred: {e}")

def download_steamgames_dataset() -> pd.DataFrame:
    df = pd.read_parquet(
        "hf://datasets/FronkonGames/steam-games-dataset/data/train-00000-of-00001-e2ed184370a06932.parquet")
    return df


def write_dataset_pqt(df: pd.DataFrame, filename: str = DATASET_FILENAME, overwrite: bool = False) -> bool:
    dir = DATASET_DIR
    path = dir+filename
    if (check_data_dir_exists() == False):
        create_data_dir()
    if check_file_exists(path) and overwrite == False:
        print("File exists. Pass 'overwrite' to replace.")
        return False
    else:
        df.to_parquet(path, compression='zstd')
        return True


def read_dataset_pqt(filename: str = DATASET_FILENAME):
    path = DATASET_DIR+filename
    if check_file_exists(path):
        print("Loading dataset from local storage...")
        prq = pd.read_parquet(path)
        print("✅ Local dataset loaded.")
        return prq
    else:
        print("Parquet file not found.")

def datestamp():
    """ Get the current datestamp """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def write_model_log(path:str, config: dict, **kwargs):
    with open(path+".txt", "a") as file:
        file.write(f"[{datestamp()}]\n")
        file.write(f"{config}\n")
        if kwargs:
            for x in kwargs:
                file.write(f"{x}\n")

def pickle_model(filename: str, model, params_dict:dict, param_grid:dict = None,overwrite: bool=False, **extra_data):
    dir = MODELS_DIR
    path = dir+filename
    for s in params_dict.values():
        path += f'-{s}'
    if (check_models_dir_exists() == False):
        create_models_dir()
    if check_file_exists(path) and overwrite == False:
        print("File exists. Pass 'overwrite' to replace.")
        return False
    else:
        if param_grid is not None:
            write_model_log(path, param_grid, **extra_data)
        else:
            write_model_log(path, params_dict)
        level = 7   # Good balance between speed and compression
        with gzip.open(path+".pkl.gz", "wb", compresslevel=level) as file:
            pickle.dump(model, file, protocol=5)
        return True

def unpickle_model(filename):
    path = MODELS_DIR+filename
    with gzip.open(path+".pkl.gz", "rb") as file:
        return pickle.load(file)

def download_and_save_dataset(force: bool = False, filename: str = DATASET_FILENAME) -> pd.DataFrame | None:
    dir = DATASET_DIR
    path = dir+filename
    if (check_file_exists(path)):
        print(f"⚠️ Dataset exists locally. Path:{path}")
        if (force == False):
            print("Use force=True to download and overwrite.")
            return None
        else:
            print("Redownloading and Overwriting...")
    else:
        print(f"Downloading and saving dataset to {path} ")
    df = download_steamgames_dataset()
    write_dataset_pqt(df, overwrite=False)
    print("✅ Done.")
    print(f"Saved to: {path}")
    return df


df = download_and_save_dataset(force=False)
if(df is None):
    df = read_dataset_pqt()

## Pre-processing data

In [None]:
# Check for any missing values
sum = df.isnull().sum()
sum[sum != 0]

#### Drop irrelevant columns

In [None]:
# remove any columns that won't contribute to a game's success rating
cols_to_remove = ['About the game', 'Supported languages', 'Full audio languages',
                  'Header image', 'Website', 'Support url', 'Support email', 'Metacritic url',
                  'Score rank', 'Screenshots', 'Movies']
df = df.drop(columns=cols_to_remove, axis=1)
df.head()

#### Standardizing data and pre-processing

In [None]:
# function that calculates the number of years since a game's release date
from datetime import datetime
def years_since_release(date_string):
  if len(date_string) == 11 or len(date_string) == 12:
        date = datetime.strptime(date_string, "%b %d, %Y")
  else: # length must be 8 or 9
      date = datetime.strptime(date_string, "%b %Y")

  current_date = datetime.now()
  years = (current_date - date).days / 365
  return years

# function to return the avg number of estimated owners
def est_owners(num_owners):
  numbers = num_owners.split('-')
  return (int(numbers[0]) + int(numbers[1])) / 2

# function to normalize a numerical column between 0-1 based on min and and max values
def min_max_normalize(column):
  column = np.array(column)
  norm_col = ( column - np.min(column) ) / ( np.max(column) - np.min(column) )
  return norm_col

In [None]:
# convert release date to years since release
df['Release date'] = df['Release date'].apply(years_since_release)

# return middle value for each given range of estimated owners
df['Estimated owners'] = df['Estimated owners'].apply(est_owners)

# convert windows, mac, and linux columns from boolean to integer
df['Windows'] = df['Windows'].astype(int)
df['Mac'] = df['Mac'].astype(int)
df['Linux'] = df['Linux'].astype(int)

In [None]:
# Filter out any games that are free, have no peak ccu, and no estimated owners
# This allows us to judge success based on games that competed in certain markets, and have had actual people play them
no_peak_ccu_cols = df[df['Peak CCU'] == 0].index
df = df.drop(no_peak_ccu_cols, axis=0)

no_est_owners_cols = df[df['Estimated owners'] == 0].index
df = df.drop(no_est_owners_cols, axis=0)

no_price_cols = df[df['Price'] == 0].index
df = df.drop(no_price_cols, axis=0)

Data for later use in sentiment analysis and model performance calculations.

In [None]:
# keep a copy of pre_normalized values
df_orig = df.copy(deep=True)

# store reviews for sentiment analysis
df_reviews = df['Reviews'].copy(deep=True)

In [None]:
# normalize any large value ranges
cols_to_normalize = ['Release date', 'Estimated owners', 'Peak CCU', 'Required age', 'Price', 'DLC count',
                     'Metacritic score', 'User score', 'Positive', 'Negative', 'Achievements',
                     'Recommendations', 'Average playtime forever', 'Average playtime two weeks',
                     'Median playtime forever', 'Median playtime two weeks']
for col in cols_to_normalize:
  df[col] = min_max_normalize(df[col])

In [None]:
# If we want to remove rows that have no reviews, we would have 4269 examples
#df = df.dropna(axis=0, subset='Reviews')
#print(df.shape[0])
#df.isnull().sum()

In [None]:
print(df.shape)
df.head()

### Counting unique words in Categories, Genres, Tags

'Dumb counting' as in the tags 'turn-based' and 'turn-based combat' or 'turn-based strategy' are different words. These should be ok for word2vec as they're similar.

In [None]:
df.columns
df['Tags']

def count_unique_words(df, label:str):
    lists:pd.Series= df[label].str.casefold().str.split(',')
    words = set()
    [words.update(x) for x in lists if x is not None]
    print(f"Number of unique {label}: {len(words)}")
    return len(words)

count_unique_words(df, 'Categories') # 39
count_unique_words(df, 'Genres') # 27
count_unique_words(df, 'Tags') # 444
pass;

#### One hot encoding Catergories and Genres

In [None]:
encoded_categories = df['Categories'].str.get_dummies(sep=',')
encoded_genres = df['Genres'].str.get_dummies(sep=',')

df = pd.concat([df, encoded_categories, encoded_genres], axis=1)
df = df.drop(columns=['Categories', 'Genres'], axis=1)
print(df.shape)

### Word2Vec embedding for Tags feature

Currently the embedding for the tags is an average of the tags for a given game. This results in d-dimensional feature embedding where d is the numer of dimensions specified in word2vec training.

todo: process hyphenated and multi-word tags. Treat as one phrase by subbing dashes and spaces with an underline

todo: tuning: what do the parameters do? what can be tweaked? what is desired?

todo: CBOW vs CSkipGram

In [None]:
label = 'Tags'
lists:pd.Series= df[label].str.casefold().str.split(',')
# lists.fillna('none')
lists = lists.apply(lambda x: ['none'] if x is None else x)
sentences = [x for x in lists]

In [None]:
print(sentences[9])

#### Training word2vec model

In [None]:
import gensim
from sklearn import svm
from sklearn.model_selection import train_test_split
class s_word2vec:
# model_name = "100features_1minwords_10context"
    def __init__(self):
        self.model = None
        self.num_features = 100# Word vector dimensionality
        self.min_word_count = 1  # Minimum word count
        self.context = 10 # Context window size
        self.model_name = f'{self.num_features}-feat_{self.min_word_count}-minwords_{self.context}-context'

        self.num_workers = 8  # Number of threads to run in parallel
        self.downsampling = 1e-3  # Downsample setting for frequent words

    def _init_sims(self, model):
        # If you don't plan to train the model any further, calling
        # init_sims will make the model much more memory-efficient.
        print("get_mean_vector is deprecated. Use get_vector(key, norm=True) instead")
        self.model.init_sims(replace=True)

    def load_or_train_model(self):
        if check_file_exists(self.model_name):
            print("Loading saved model: ", self.model_name)
            self.model = gensim.models.Word2Vec.load(self.model_name)
            self._init_sims(self.model)

        else:
            # Code from:
            # https://www.kaggle.com/competitions/word2vec-nlp-tutorial/overview

            print("Training model...")
            self.model = gensim.models.Word2Vec(
                sentences,
                workers=self.num_workers,
                vector_size=self.num_features,
                min_count=self.min_word_count,
                window=self.context,
                sample=self.downsampling,
            )
            # It can be helpful to create a meaningful model name and
            # save the model for later use. You can load it later using Word2Vec.load()
            self.model.save(self.model_name)


tags_w2v_model = s_word2vec()
tags_w2v_model.load_or_train_model()
model:gensim.models.Word2Vec = tags_w2v_model.model


Vector of the tag 'singleplayer'

In [None]:
sent = model.wv.get_mean_vector(sentences[9][1:])
model.wv.similar_by_vector(sent, topn=len(sentences[9]))

In [None]:
print(len(model.wv.index_to_key))
print(model.wv.index_to_key[3])
model.wv['action'][:10]

#### Visualizing word2vec tag vectors

Using TSNE we can visualize the clustering of similar word vectors in word2vec model.

In [None]:
!pip install -q adjustText

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
# conda install adjustText::conda-forge
from adjustText import adjust_text
from sklearn.decomposition import PCA

""" Code for graphing from:
    https://github.com/arsena-k/Word2Vec-bias-extraction/blob/master/Part_A_W2V_training_performance_exploring.ipynb
"""
def tsne_plot(words, vectors, iterations, seed, title):
    "Creates and TSNE model and plots it"
    # you may need to tune these, epsecially the perplexity.
    tsne_model = TSNE(
        perplexity=7,
        n_components=2,
        init="pca",
        max_iter=iterations,
        random_state=seed,
        n_jobs=-1,
    )
    new_values = tsne_model.fit_transform(np.asarray(vectors))
    # pca = PCA(2, svd_solver='full', random_state=42)
    # new_values = pca.fit_transform(np.asarray(vectors))

    x,y, texts = [],[],[]
    for value in new_values:
        x.append(value[0])
        y.append(value[1])

    plt.figure(figsize=(30, 30))
    for i in range(len(x)):
        plt.scatter(x[i], y[i])
        texts.append( plt.text(
                s=words[i],
                #  xy=(x[i], y[i]),
                x=x[i], y=y[i],
                #  xytext=(x[i] + 0.1, y[i] - 0.2),
                #   xytext=(5, 2),
                #  textcoords='offset points',
                ha="center", va="center",))
    adjust_text(
        texts,
        expand=(6,5),
        explode_radius=(15),
        avoid_self=False,
        max_move=(13,13),
        force_text=(4,5),
        force_explode=(5,5),
        # force_static=(10,15),
        # pull_threshold=20,
        # force_pull=(0.1,0.1),
        arrowprops=dict(arrowstyle="->", connectionstyle="arc3,rad=0.08"),
    )
    plt.ylabel("Latent Dimension 1")
    plt.xlabel("Latent Dimension 2")
    plt.title(title)
    plt.show()


my_word_list, my_word_vectors, label = [], [], []

for i in model.wv.index_to_key:
    try:
        if my_word_list not in my_word_list:
            my_word_vectors.append(model.wv[i])
            my_word_list.append(i)
    except (
        KeyError
    ):  # if one of the words_to_explore is not in the model vocab, just skip it
        continue

tsne_plot(my_word_list, my_word_vectors, iterations=2000, seed=23, title="TSNE Visualization of Word-Vectors")

#### Averging word2vec vectors
The tag vectors corresponding to each game from the word2vec model are averaged to prepare a 100 dimensional embedding.


In [None]:
model.wv.most_similar('action', topn=10)
model.wv.similar_by_word('action', topn=10) # same result

In [None]:
words = model.wv.index_to_key
words[0:10]
model.wv.most_similar('none') # this needs fixin

In [None]:
"""
    pre-normalizing will discard sentence length information
    this should ignore differences in numbe of tags specified for each game
    Pre-normalize doesnt matter if init_sims(replace=True) since it will
    precompute normalized vectors.
    Not clear what the point of post_normalize is. May be/not good for training
    the regression model down the line.
"""

tags_vectors = [
    model.wv.get_mean_vector(game, pre_normalize=False, post_normalize=False)
    for game in sentences
]

In [None]:
# print('Number of games', len(tags_vectors))
# model.wv.similar_by_vector(tags_vectors[0],topn=20)
# np.mean(tags_vectors[0])
# np.linalg.norm(tags_vectors[0])

In [None]:
df['Tags'][6]

In [None]:
w2vdf = pd.DataFrame(tags_vectors)
assert w2vdf.shape[1] == tags_w2v_model.num_features
w2vdf.columns = [f'w2v_embed_{i}' for i in range(tags_w2v_model.num_features)]
w2vdf.head()

#### Dropping Tags columns and merging embeds

In [None]:
""" Trick to prevent this from executing twice """
try:
    check_if_w2vdf_already_concat
except NameError:
    df.drop(columns=['Tags'])
    df.reset_index(drop=True, inplace=True)
    df = pd.concat([df, w2vdf], axis=1)
    check_if_w2vdf_already_concat = 1

# del check_if_w2vdf_already_concat

### HDBSCAN clustering


In [None]:
df.head()

In [None]:
# X_train, X_test = train_test_split(tags_vectors, test_size=0.2, train_size=0.8, random_state=42, shuffle=True)

## Processing Reviews Using Sentiment Analysis

In [None]:
df_reviews.notna().sum()

There are 4269 reveiws which we can analyze for sentiment. Using the Twitter Roberta model we get three scores (negative, neutral, positive) which are computed into a compound score using a simple weighting of [-1, 0, 1], respectively, and a dot product of the scores. These scores are then gathered and averaged into a single score for each game.

Please unzip the sentiment model from the google drive folder into the models directory.

When unzipped, the models directory should contain the folder `twitter-roberta-base-sentiment-latest` with 5 files inside.

In [None]:
!pip install -q transformers
!pip install -q scipy
!pip install torch -q torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

In [None]:
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
import numpy as np
from scipy.special import softmax
import torch


def calculate_sentiment():
    roberta_path = 'models/twitter-roberta-base-sentiment-latest'
    MODEL = roberta_path  # f"cardiffnlp/twitter-roberta-base-sentiment-latest"
    tokenizer = AutoTokenizer.from_pretrained(MODEL)
    config = AutoConfig.from_pretrained(MODEL)
    model = AutoModelForSequenceClassification.from_pretrained(MODEL);


    def preprocess(text: str):
        if (text.find("“") != -1):
            p = text.split("“")
            p = [x.split("”")[0].strip() for x in p]
            p = p[1:]
        else:
            # if it's just a single review or follows a different format
            # then just treat the whole string as a review
            p = [text]
        return p

    def calculate_compound_score(scores):
        sentiment_probabilities = np.asarray(scores)
        weights = np.array([-1, 0, 1], dtype=np.float32)
        return np.dot(sentiment_probabilities, weights)


    s = df_reviews[df_reviews.notna()]
    s = s.apply(preprocess)
    # Tweak batch to your system.
    # Mem Usage: 10-50 is safe. ~25 may be fastest. Needs around 8gb ram.
    #=========
    batch_size = 25
    #=========
    scores = []
    for i in range(0, s.size, batch_size): # compute in batches
        p = s[i:i + batch_size]
        pretokenized = [review for row in p.tolist() for review in row] # create a list of reviews
        # pretokenized = "New WW2 Strategy Game Offers A Harrowing Look At Poland's Ill-Fated 1944 Uprising"
        # compute the tokens
        encoded_input = tokenizer(
            pretokenized, truncation=True, padding=True, max_length=512, return_tensors='pt')
        with torch.no_grad():
            output = model(**encoded_input)
        # gather scores in each batch
        scores.extend([calculate_compound_score(softmax(logits.numpy()))
                      for logits in output.logits])


    row_lengths = [len(reviews) for reviews in s.tolist()]
    row_lengths
    game_sentiments = np.split(scores, np.cumsum(row_lengths)) # review scores grouped by game

    if(game_sentiments[-1].shape == (0,)):
        game_sentiments = game_sentiments[:-1]

    game_sentiments = [np.mean(x) for x in game_sentiments] # average the score for each game

    df_review_scores = pd.Series(game_sentiments, index=s.index) # re-index

    # Copy df_reviews to avoid overwriting
    df_reviews_with_scores = df_reviews.copy()

    # Assign scores to the corresponding indices in the new series
    df_reviews_with_scores.loc[df_review_scores.index] = df_review_scores

    print(df_reviews_with_scores)
    return df_reviews_with_scores

try:
    df_reviews_with_scores = unpickle_model('df_reviews_with_scores-values')
except:
    print('No stored values found. Running fresh sentiment analysis.')
    df_reviews_with_scores = calculate_sentiment()

In [None]:
# pickle_model('df_reviews_with_scores', df_reviews_with_scores, {'no':'values'})
print(df_reviews_with_scores.shape)
df_reviews_with_scores

In [None]:
# Leave any games with no reviews with a neutral sentiment rating (0)
df_reviews_with_scores = df_reviews_with_scores.fillna(0)
df_reviews_with_scores

In [None]:
print(type(df_reviews_with_scores[6]))

## Train/test data extraction + Regression model selection

The most important metrics when determinng a game's success include the number of estimated owners, peak ccu, number of pos/neg reveiws, and price.

In [None]:
y = np.array(df[['Estimated owners', 'Peak CCU', 'Positive', 'Negative', 'Price']])
X = np.array(df.drop(columns=['AppID', 'Name', 'Estimated owners', 'Peak CCU', 'Positive', 'Negative', 'Price', 'Reviews', 'Notes', 'Developers', 'Publishers', 'Tags'], axis=1))

print(X.shape)
print(y.shape)

In [None]:
#print(X[0,:]) # ensure all data is numerical

RandomForestRegressor is used to handle non-linear relationships between a game and the metrics we are predicting. MultiOutputRegressor provides easier setup for the model.

A grid search will also be done on the hyperparemeters for the random forest regressor.

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.model_selection import cross_val_score
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# 70% training data, 30% test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Apply PCA
pca = PCA(n_components=52)
X_train = pca.fit_transform(X_train_scaled)
X_test = pca.transform(X_test_scaled)


print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

In [None]:
# Cumulative explained variance
cumulative_variance = np.cumsum(pca.explained_variance_ratio_)

# Find the number of components for 87.5% variance
n_components = np.argmax(cumulative_variance >= 0.875) + 1  # Add 1 because index starts at 0

print(f"Number of components to preserve 87.5% variance: {n_components}")

Disclaimer, this cell takes hours to complete!

In [None]:
perform_search = False
grid_search = None
if perform_search == True:
      # perform a grid search on hyperparameters for random forest
      # -1 to utilize all processors and speed up training time
      rf = RandomForestRegressor(random_state=42, n_jobs=-1)
      model = MultiOutputRegressor(rf, n_jobs=-1)

      param_grid = [
      {'estimator__n_estimators': [20, 50, 100, 150, 200, 250],
       'estimator__max_features': [1, 20, 'sqrt', 50, 70, 90, 110],
       'estimator__max_depth': [None, 10, 20, 30, 40, 50]}
      ]

      grid_search = GridSearchCV(model, param_grid, n_jobs=-1)
      grid_search.fit(X_train, y_train)

      pickle_model("rf_gridsearch_obj", grid_search, grid_search.best_params_, param_grid[0])
      print(grid_search.best_params_)

Best hyperparams were a max branch depth of 50, a random subset of 70 features for splitting branches, and 150 estimators/trees for random forest. These parameters are the most infuential to model capacity, generalization, and computation. Other parameters like min_samples_split were ommitted from grid search since the default is adequte to recognize patterns in the data.

In [None]:
from sklearn.metrics import r2_score

if perform_search == True:
    model = grid_search.best_estimator_
    y_pred = model.predict(X_test)

    mse = mean_squared_error(y_test, y_pred, multioutput='raw_values')
    rmse = np.sqrt(mse)
    r2_score_values= r2_score(y_test, y_pred, multioutput='raw_values')

    # ['Estimated owners', 'Peak CCU', 'Positive', 'Negative', 'Price']
    print("Test set Mean Squared Error:", mse)
    print("Test set Root Mean Squared Error:", rmse)
    print("Test set R2 Score:", r2_score_values)

In [None]:
# load saved model
""" You can download from link in README """
load_model = True
pca_model_filename = 'rf_pca'
pca_params = {'max_depth': 50, 'max_features': 70, 'n_estimators': 150}
for s in pca_params.values():
    pca_model_filename += f'-{s}'
try:
    if load_model != True:
        assert 'Training model'
    loaded_model = unpickle_model(pca_model_filename)
    model = loaded_model
except:
    rf_pca = RandomForestRegressor(random_state=42, n_jobs=-1, verbose=1, **pca_params)
    model = MultiOutputRegressor(rf_pca, n_jobs=-1)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    pickle_model('rf_pca', model, pca_params)

if perform_search == False:
    y_pred = model.predict(X_test)

    mse = mean_squared_error(y_test, y_pred, multioutput='raw_values')
    rmse = np.sqrt(mse)
    r2_score_values= r2_score(y_test, y_pred, multioutput='raw_values')

    # ['Estimated owners', 'Peak CCU', 'Positive', 'Negative', 'Price']
    print("Test set Mean Squared Error:", mse)
    print("Test set Root Mean Squared Error:", rmse)
    print("Test set R2 Score:", r2_score_values)

In [None]:
from dataclasses import dataclass, fields, field

@dataclass
class DataMinMax:
    data:dict = field(default_factory=dict)


In [None]:
predict_labels = ["Estimated owners", "Peak CCU", "Positive", "Negative", "Price"]
calc: DataMinMax = DataMinMax()
for i, label in enumerate(predict_labels):
    calc.data[label] = {
        "min": df_orig[label].min(),
        "max": df_orig[label].max(),
        "r2": r2_score_values[i],
        "rmse": rmse[i],
    }
# owners = df_orig['Estimated owners']
# calc.owners = (min())
# du

In [None]:
# Convert the data into a DataFrame
rows = []
for label in predict_labels:

    fmt_int = lambda x: f"{x:,.0f}"
    fmt_float = lambda x, precision=2: f"{x:,.{precision}f}"
    min_val = calc.data[label]["min"]
    max_val = calc.data[label]["max"]
    rmse_val = calc.data[label]["rmse"]
    r2_val = calc.data[label]["r2"]
    range_val = max_val - min_val
    range_percent = rmse_val * 100
    prediction = rmse_val*range_val
    rows.append({
        "Metric": label,
        "Prediction": fmt_float(prediction, 2),
        "Min": fmt_float(min_val,2),
        "Max": fmt_float(max_val),
        "RMSE": fmt_float(rmse_val, 4),
        "R^2": fmt_float(r2_val, 4),
        "Range (%)": fmt_float(range_percent,2),

    })

# Create DataFrame
df_result = pd.DataFrame(rows)

# Display the DataFrame
df_result

Looking at the R2 Score, which indicates the proportion of variance in the dependent variable that is predictable from the independent variables, the model is able to capture underlying patterns decently for the estimated owners, positive number of reviews, and negative number of reviews. This suggests that relationships between the features and target variables are relatively strong, making them easier to predict.

This is logical. Game characteristics like developers, publishers, and categories will directly influence price and peak ccu  counts more so than the other target variables. Since these aren't taken into account during training to avoid too many feature encodings, the correlation between these characteristics makes them harder to predict. **This will help us assign a score to each prediction when defining a success rating.**

## Success Rating ##

The importance of each predicted parameter is determined by its R² score. Variables that account for a larger portion of their variance in predictions are given greater weights because they are more dependable.


The composite score will be calculated based on the R² weights, the accuracy of the predictions, and the sentiment score. High predictions for price and number of negative reveiws will be penalized since a game should maximize their peack ccu, esitmated owners, and number of positive reviews will minimizing cost.

In [None]:
# compute weights of each predicted value based on r^2 value
r2_weights = r2_score_values / np.sum(r2_score_values) # weights add up to 1

# transform input data
X_scaled = scaler.transform(X)
X_pca = pca.transform(X_scaled)

# function that takes a random/chosen game, and computes its success rating
def success_rating(random=False, index=None):
    if random == True:
        index = np.random.randint(0, df.shape[0])

    game = X_pca[index]

    # collect predicted and actual metrics for the selected game
    predictions = model.predict(game.reshape(1,-1))  # ['Estimated owners', 'Peak CCU', 'Positive', 'Negative', 'Price']
    actual_metrics = y[index].reshape(1,-1)

    # Calculate RMSE for each predicted metric
    rmse = np.sqrt(mean_squared_error(actual_metrics, predictions, multioutput='raw_values')).reshape(1,-1)
    # normalize the rmse to prevent peak ccu and price from dominating the score (expecting large errors)
    rmse = min_max_normalize(rmse)


    sentiment_score = df_reviews_with_scores.iloc[index]

    # high 'price' and number of 'negative' reviews will be penalized
    contributions = r2_weights * rmse
    composite_score = np.sum(contributions[0, :3]) - np.sum(contributions[0, 3:-1])  + sentiment_score

    info = df_orig.iloc[index]

    return predictions, actual_metrics, composite_score, info

In [None]:
_, _, score, game_info = success_rating(random=True)
#print("Predictions: ", game_predictions)
#print("Actual: ", game_metrics)
print("Score: ", score)
print(game_info.to_numpy())

To test how the score differers with different games, lets run 300 iterations of success_rating, and compare the scores to the different metrics that we predict.

In [None]:
columns = [
    "Score", "AppID", "Name", "Release date", "Estimated owners", "Peak CCU",
    "Required age", "Price", "DLC count", "Reviews", "Windows",
    "Mac", "Linux", "Metacritic score", "User score", "Positive",
    "Negative", "Achievements", "Recommendations", "Notes",
    "Average playtime forever", "Average playtime two weeks",
    "Median playtime forever", "Median playtime two weeks",
    "Developers", "Publishers", "Categories", "Genres", "Tags"
]

# Create an empty DataFrame with these columns
table = pd.DataFrame(columns=columns)

# 300 trials to compare scores of different games
for i in range(300):
    _, _, score, game_info = success_rating(random=True)
    game = game_info.to_numpy()

    # Convert game_info to a dictionary with matching columns
    game_row = dict(zip(columns[1:], game_info))  # Exclude "Score" column
    game_row["Score"] = score

    # Append the dictionary as a new row to the DataFrame
    table = pd.concat([table, pd.DataFrame([game_row])], ignore_index=True)

In [None]:
# drop any duplicates and print the table
table = table.drop_duplicates(subset=['AppID'])
table = table.sort_values(by='Score', ascending=False)
print(table.to_string())

In [None]:
# illustrate the results by comparing the score to each predicted metric
from matplotlib import pyplot as plt
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(13,6))
fig, (ax3, ax4) = plt.subplots(1, 2, figsize=(13,6))
fig, (ax5) = plt.subplots(1, 1, figsize=(6,6))

ax1.scatter(table["Estimated owners"], table["Score"], c='red')
ax1.set_xlabel('Estimated Owners')
ax1.set_ylabel('Score')
ax1.set_title('Estimated Owners vs Score')

ax2.scatter(table["Peak CCU"], table["Score"], c='blue')
ax2.set_xlabel('Peak CCU')
ax2.set_ylabel('Score')
ax2.set_title('Peak CCU vs Score')

ax3.scatter(table["Positive"], table["Score"], c='pink')
ax3.set_xlabel('Positive Reviews')
ax3.set_ylabel('Score')
ax3.set_title('Pos. Reviews vs Score')

ax4.scatter(table["Negative"], table["Score"], c='green')
ax4.set_xlabel('Negative Reviews')
ax4.set_ylabel('Score')
ax4.set_title('Neg. Reviews vs Score')

ax5.scatter(table["Price"], table["Score"], c='brown')
ax5.set_xlabel('Price')
ax5.set_ylabel('Score')
ax5.set_title('Price vs Score')


plt.show()