# Game Rating Prediction
## Based off ML, Kafka, Spark
### By: Natty Zepko (302309513) , Amit Yehezkel (211899489)

In the following project we will be using MLto predict the rating of games in the Steam store.
Each game has the following features:

| Feature | Type | Description |
| :-- | :-- | :-- |
| App ID | INT | Uniquely assigned ID number given by Steam |
| Name | STRING (utf-8) | The full name of the product. May include special characters, emojis, and characters from other languages |
| Total Reviews | INT | The total amount of reviews the product has on steam (both negative and positive) |
| Positive Reviews Percent | DOUBLE | number of positive reviews divided by the total reviews |
| Developer | STRING (utf-8) | The credited Developer. May include special characters, emojis, and characters from other languages |
| Publisher | STRING (utf-8) | The credited Publisher. May include special characters, emojis, and characters from other languages |
| Metacritic Score | INT | Score ranged from 0 to 100 (inclusive), represents an internal weighted average of professional raters. Provided by steam, or set to 50 in case the product isn't rated. |
| Is Free | 0/1 | 1 if free, 0 otherwise |
| Genre 1 | INT | The first, most defining Genre of the game. -1 if none provided |
| Genre 2 | INT | The secondary defined Genre of the game. -1 if none provided |
| Genre 3 | INT | The thirdly defining Genre of the game. -1 if none provided |
| Peak | INT | The largest amount of players playing the game at the same time (since launch) |


Peak, specifically was not provided by steam, but was collected from steam-charts. It would default to -1 if not found on their website, despite out expectation to not find any, due to us filtering the games with less than 50 reviews (which are often written after playing the game for some time, so value of at least 1 is expected for all games).

We were, however, proven wrong, as we found games and products that have 0 players because it is either a DLC (Downloadable Content, meant to be added to an existing game), or test products that can't be played, or used. There products have 0 "Peak".

Imports:

In [20]:
# imports

# main packages
import csv
import math
import numpy as np
import pyspark.sql.functions as func
import matplotlib.pyplot as plt

# functions and tools
from time import sleep
from threading import Thread
from functools import partial

# pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.ml.classification import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator, ClusteringEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StringIndexer, PCA, StandardScaler, StopWordsRemover, CountVectorizer

# Kafka
from confluent_kafka import Producer, Consumer


# First step: Reading the CSV:

In [21]:
spark = SparkSession.builder.appName("SteamPile").getOrCreate()

df = spark.read.csv("steam_app_list_detailed_v2.csv", header=True, inferSchema=True)
df

DataFrame[appid: int, name: string, total_reviews: int, positive_reviews_percent: double, developer: string, publisher: string, metacritic_score: int, is_free: int, genre1: int, genre2: int, genre3: int, price: double, peak: int]

# Second step: Feature Engineering:

We need to assort each developer and publisher into categories. This can be done through indexing, such that we replace names with integers. This makes the ML easier to recognize a pattern (rather than analyze characters).

In [22]:
def fix_dataframe(df):
    #Remove "is free"
    df=df.drop("is_free")

    #Change developer name into categorical numbers
    indexer = StringIndexer(inputCol="developer", outputCol="developerIndex").setHandleInvalid("keep")
    indexed = indexer.fit(df).transform(df)
    df = indexed.drop("developer")
    df = df.withColumn("developerIndex", func.round(df["developerIndex"]).cast('integer'))

    #Change publisher name into categorical numbers
    indexer = StringIndexer(inputCol="publisher", outputCol="publisherIndex").setHandleInvalid("keep")
    indexed = indexer.fit(df).transform(df)
    df = indexed.drop("publisher")
    df = df.withColumn("publisherIndex", func.round(df["publisherIndex"]).cast('integer'))

    #remove name and ID from model (as it isn't neccesary for ML model)
    df2=df.drop("name")
    df2=df2.drop("appid")
    return df2
df2 = fix_dataframe(df)
df2.show()

23/07/31 19:33:45 WARN DAGScheduler: Broadcasting large task binary with size 1184.8 KiB
+-------------+------------------------+----------------+------+------+------+-------+----+--------------+--------------+
|total_reviews|positive_reviews_percent|metacritic_score|genre1|genre2|genre3|  price|peak|developerIndex|publisherIndex|
+-------------+------------------------+----------------+------+------+------+-------+----+--------------+--------------+
|          105|      0.8761904761904762|              50|     1|    25|    23|  499.0|  25|         12137|          9473|
|          106|      0.7075471698113207|              50|     1|     4|     9| 3695.0|   6|          7883|          6099|
|           59|       0.711864406779661|              50|     1|    25|    37|20995.0|   0|           184|           328|
|           74|      0.4864864864864865|              50|    25|    23|     3| 1850.0|   0|          4322|           475|
|          139|      0.5899280575539568|              50|

# Third step: Turn DF into vector:

We write the functions for transforming a DF into a vector used for ML. We add an option to do it with, or without labels, for training purposed, or for prediction without labels.

In [23]:
def transform_vector(df2):
    # Tranforming features to vectors
    chosen_columns = ["metacritic_score", "genre1", "genre2", "genre3","price","peak","developerIndex","publisherIndex"]
    vectors_assembler = VectorAssembler(inputCols=chosen_columns, outputCol="features")
    df3= vectors_assembler.transform(df2).select(col("features"),col("positive_reviews_percent"))
    df3=df3.withColumnRenamed("positive_reviews_percent", "label")
    return df3

def trasnform_vector_without_label(df2):
    # Tranforming features to vectors
    chosen_columns = ["metacritic_score", "genre1", "genre2", "genre3","price","peak","developerIndex","publisherIndex"]
    vectors_assembler = VectorAssembler(inputCols=chosen_columns, outputCol="features")
    df3= vectors_assembler.transform(df2).select("features")
    return df3
df3 = transform_vector(df2)
df3.show()

23/07/31 19:33:45 WARN DAGScheduler: Broadcasting large task binary with size 1197.0 KiB
+--------------------+------------------+
|            features|             label|
+--------------------+------------------+
|[50.0,1.0,25.0,23...|0.8761904761904762|
|[50.0,1.0,4.0,9.0...|0.7075471698113207|
|[50.0,1.0,25.0,37...| 0.711864406779661|
|[50.0,25.0,23.0,3...|0.4864864864864865|
|[50.0,4.0,23.0,-1...|0.5899280575539568|
|[50.0,25.0,23.0,7...|0.3676470588235294|
|[50.0,4.0,28.0,-1...|0.7677824267782427|
|[50.0,1.0,25.0,4....|0.9463087248322148|
|[50.0,4.0,37.0,23...|0.9805697151424289|
|[50.0,1.0,25.0,4....|0.6582089552238806|
|[50.0,1.0,37.0,-1...|0.1206896551724138|
|[50.0,1.0,37.0,-1...|               0.2|
|[50.0,1.0,37.0,-1...|0.4482758620689655|
|[50.0,25.0,-1.0,-...|0.9466666666666668|
|[50.0,25.0,4.0,37...|0.8947368421052632|
|[50.0,1.0,4.0,37....|0.8823529411764706|
|[50.0,9.0,28.0,18...|0.8666666666666667|
|[50.0,2.0,-1.0,-1...|0.8351648351648352|
|[50.0,4.0,-1.0,-1...|       

# Fourth step: Model building:

In [24]:
def train(df):
    # Splits the dataframe to train (80%) and validation (20%)
    train, validation = df.randomSplit([0.8, 0.2], seed = 812)
    # Trains a classification model on the train set and makes predictions on the validation set
    model = LinearRegression(featuresCol="features", labelCol="label", predictionCol="predicted_medv").fit(train)
    predictions = model.transform(validation)
    
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="predicted_medv", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))
    return model

def predict(model, df):
    df = fix_dataframe(df)
    df = trasnform_vector_without_label(df)
    return model.transform(df)

## Now we just have to apply the training function to the dataframe:

We would require the RMSE value to be less than 0.2 at least, to know if our model's average error is as low as possible.

In [25]:
train(df3)

23/07/31 19:33:45 WARN Instrumentation: [a666ff8d] regParam is zero, which might cause numerical instability and overfitting.
23/07/31 19:33:45 WARN DAGScheduler: Broadcasting large task binary with size 1356.7 KiB


[Stage 59:>                                                         (0 + 1) / 1]                                                                                

23/07/31 19:33:46 WARN DAGScheduler: Broadcasting large task binary with size 1362.5 KiB


[Stage 60:>                                                         (0 + 1) / 1]                                                                                

23/07/31 19:33:47 WARN DAGScheduler: Broadcasting large task binary with size 1363.0 KiB
Root Mean Squared Error (RMSE) on test data: 0.162


LinearRegressionModel: uid=LinearRegression_85616399a27f, numFeatures=8

### Feature testing allowed us to discover that "Total Reviews" is redundant. We removed it, and got the same RMSE score (or slightly lower, by less that 1% with a different random seed)

In [26]:
# Tranforming features to vectors
chosen_columns = ["metacritic_score", "genre1", "genre2", "genre3","price","peak","developerIndex","publisherIndex"]
vectors_assembler = VectorAssembler(inputCols=chosen_columns, outputCol="features")
vectors_assembler

df3=vectors_assembler.transform(df2).select(col("features"),col("positive_reviews_percent"))
df3=df3.withColumnRenamed("positive_reviews_percent", "label")
games_model = train(df3)

23/07/31 19:33:48 WARN Instrumentation: [180ed568] regParam is zero, which might cause numerical instability and overfitting.
23/07/31 19:33:48 WARN DAGScheduler: Broadcasting large task binary with size 1356.7 KiB
23/07/31 19:33:48 WARN DAGScheduler: Broadcasting large task binary with size 1362.5 KiB
23/07/31 19:33:49 WARN DAGScheduler: Broadcasting large task binary with size 1363.0 KiB
Root Mean Squared Error (RMSE) on test data: 0.162


### The RMSE result is about 0.162, lower than 0.2, as we required.

# Fifth Step: Kafka (sorting the data into 'topics'):

We split the data into 10 different CSV files, each containing aprox. 3,000 lines of games data.

Our initial idea was letting the games be sorted into 5 topics, based on P, the predicted percent of positive reviews our model would predict:

| Topic | classification |
| :-- | :-: |
| Gold | 0.9 < P |
| Positive | 0.7 < P <= 0.9 |
| Mixed | 0.4 < P <= 0.7 |
| Negative | 0.2 < P <= 0.4 |
| Trash | 0 <= P <= 0.2 |

While planning, our thought proccess was that on average we would expect the five categories to be equally populated, or somewhat spread through the top three categories the most, while somewhat rare in the bottom two.

In [27]:
TOPICS = ["Gold", "Positive", "Mixed", "Negative", "Trash"]
CONFIGURATIONS = {"bootstrap.servers": "localhost:9092", "group.id": "game", "auto.offset.reset": "smallest"}
PRODUCERS = dict(zip(TOPICS, [Producer(CONFIGURATIONS) for topic in TOPICS]))
CONSUMERS = dict(zip(TOPICS, [Consumer(CONFIGURATIONS) for topic in TOPICS]))


def game_rank_by_percent(percent):
    if percent > 0.9:
        return TOPICS[0]
    if percent > 0.7:
        return TOPICS[1]
    if percent > 0.4:
        return TOPICS[2]
    if percent > 0.2:
        return TOPICS[3]
    return TOPICS[4]

def produce_games(batch_df, batch_id):
    # Processes and classifies the jobs batch 
    games_df = predict(games_model, batch_df.select("metacritic_score", "genre1", "genre2", "genre3","price","peak","developer","publisher"))
    a = games_df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
    b = batch_df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
    games_df = a.join(b, a.row_idx == b.row_idx).drop("row_idx")

    # Produces the classified jobs batch to topics
    games_df = games_df.rdd.map(lambda x: (x.name, game_rank_by_percent(x.predicted_medv), x.predicted_medv))
    games_df = games_df.toDF(["name", "rank", "prediction_score"])
    for game in games_df.rdd.collect():
        gameRank = game["rank"]
        message = "The game " + game["name"] +" Has the rating of " + str(game["prediction_score"]) + "\n"
        
        PRODUCERS[gameRank].produce(gameRank, value=message)
        PRODUCERS[gameRank].flush()  
        
        sleep(0.2)

def consume_games(consumer, topic):
    try:      
        while consumers_active:
            # Polls for new messages
            message = consumer.poll(timeout=3.0)
            
            if message is not None:
                # Saves the message to file
                message = message.value().decode("utf-8")
                with open(topic +".txt", "a") as file:
                    file.write(message)
                
            #sleep(0.1)
            
    except Exception as ex:
        print(ex)
        
    finally:
        # Closes the consumer to commit final offsets
        consumer.close()

# Initializes the consumers
consumers_active = True
consumer_threads = []

for topic, consumer in CONSUMERS.items():
    # Subscribes the consumer to its topic
    consumer.subscribe([topic])
    # Creates thread for each consumer
    thread = Thread(target=consume_games, args=(consumer, topic))
    consumer_threads.append(thread)
    thread.start()

%4|1690821230.093|CONFWARN|rdkafka#producer-31| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1690821230.093|CONFWARN|rdkafka#producer-31| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
%4|1690821230.094|CONFWARN|rdkafka#producer-32| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1690821230.094|CONFWARN|rdkafka#producer-32| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
%4|1690821230.094|CONFWARN|rdkafka#producer-33| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1690821230.094|CONFWARN|rdkafka#producer-33| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
%4|1690

# Last step: distributing the data among the subscribers.

We run the kafka.sh file in the terminal prior to reaching this stage.
After processing all 10 files, we stop the runtime manually (therefore we expect a runtime error at the bottom of this code snippet).

In [28]:
try:
    # Streams CSV files into a streaming DataFrame
    test_df = spark.readStream.format("csv").option("header", "true").schema(df.schema).load("./CSV")

    # Starts the streaming
    test_df.writeStream.foreachBatch(partial(produce_games)).start().awaitTermination()
    
except Exception as ex:
    print(ex)
    consumers_active = False
    for thread in consumer_threads:
        thread.join()

23/07/31 19:33:50 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-30b13787-0b99-4854-8f71-f4d1b44b3411. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/31 19:33:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/07/31 19:33:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/31 19:33:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/31 19:33:51 WARN DAGScheduler: Broadcasting large task binary with size 1216.4 KiB
23/07/31 19:33:52 WARN DAGScheduler: Broadcasting la

ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/usr/local/spark/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/linuxu/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

# Conclusion:

The model did not behave as we expected.
It isn't a bad thing, however, as our expectations, and the data we analyzed is just that homogenous.

| Topic | classification | number of results |
| :-- | :-: | :-: |
| Gold | 0.9 < P | 8 |
| Positive | 0.7 < P <= 0.9 | 30,000 |
| Mixed | 0.4 < P <= 0.7 | 1 |
| Negative | 0.2 < P <= 0.4 | 0 |
| Trash | 0 <= P <= 0.2 | 0 |

The games were not divided equally between the categories. Most games were given scores between 0.7 and 0.9.
It is, however, somewhat expected. Most games on steam with at least 50 reviews are often scored in that range because of the nature of reviewing games on steam.
We did not take into account the spread of scores, and had we anticipated the division of scores better, we would catagorize the topics in a slightly different configurations, for example, we propose:

| Topic | classification |
| :-- | :-: |
| Best | 0.9 < P |
| Fantastic | 0.85 < P <= 0.9 |
| Highly Recommended | 0.8 < P <= 0.85 |
| Recommended | 0.78 < P <= 0.8 |
| Slightly Recommended | 0.76 < P <= 0.78 |
| Well Recieved | 0.72 < P <= 0.76 |
| Average | 0.7 < P <= 0.72 |
| Badly Recieved | 0 <= P <= 0.7 |

The re-categorization and re-running can be done in aprox. 2 hours runtime, though we do not feel it is neccessary to apply these changes on our existing data. It can be done on future games, for a good prediction model.

## Why are all the games in that range?

Our guess is that the model we rewarded for guessing numbers in that range for the sheer amount of examples in that score range. The average error is evidently quite little considering most guesses are in a good ballpark distance from the actual score.

The model did not have to get the "bad games" right, it just had to get all of the "average games" right, for their sheer quantity. Most games just don't recieve that bad of a score, in real life, and due to how popularity works, the ones with low scores get few reviews and thus don't end up in our model training.