In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnull
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# Creating Spark session
spark = SparkSession.builder.appName("SteamGamesSuccess").getOrCreate()

data = "games_march2025_full.csv"
df = spark.read.csv("games_march2025_full.csv", header=True, inferSchema=True)
df.show(truncate=False)



+-------+-------------------------------+------------+------------+-----+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [2]:
used_columns = [
    "appid", "name", "release_date", "genres", "tags", "developers", "publishers",
    "price", "discount", "recommendations", "positive", "negative", "peak_ccu", "dlc_count"
]

df1 = df.select(*used_columns)

df1.show(20)

+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+--------+---------------+--------+--------+--------+---------+
|  appid|                name|release_date|              genres|                tags|          developers|          publishers|price|discount|recommendations|positive|negative|peak_ccu|dlc_count|
+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+--------+---------------+--------+--------+--------+---------+
|    730|    Counter-Strike 2|  21/08/2012|['Action', 'Free ...|{'FPS': 90857, 'S...|           ['Valve']|           ['Valve']|    0|       0|        4401572| 7480813| 1135108| 1212356|        1|
| 578080| PUBG: BATTLEGROUNDS|  21/12/2017|['Action', 'Adven...|{'Survival': 1483...|['PUBG Corporation']|   ['KRAFTON, Inc.']|    0|       0|        1732007| 1487960| 1024436|  616738|        0|
|    570|           

In [3]:
df1 = df1.withColumnRenamed("discount", "discount_percentage")

In [4]:
df1 = df1.withColumnRenamed("peak_ccu", "peak_playernum")

In [5]:
df1.printSchema()

root
 |-- appid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- developers: string (nullable = true)
 |-- publishers: string (nullable = true)
 |-- price: string (nullable = true)
 |-- discount_percentage: string (nullable = true)
 |-- recommendations: string (nullable = true)
 |-- positive: string (nullable = true)
 |-- negative: string (nullable = true)
 |-- peak_playernum: string (nullable = true)
 |-- dlc_count: string (nullable = true)



In [6]:
print(f"Total Records: {df1.count()}")

df1.show()

Total Records: 94954
+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------+--------+--------------+---------+
|  appid|                name|release_date|              genres|                tags|          developers|          publishers|price|discount_percentage|recommendations|positive|negative|peak_playernum|dlc_count|
+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------+--------+--------------+---------+
|    730|    Counter-Strike 2|  21/08/2012|['Action', 'Free ...|{'FPS': 90857, 'S...|           ['Valve']|           ['Valve']|    0|                  0|        4401572| 7480813| 1135108|       1212356|        1|
| 578080| PUBG: BATTLEGROUNDS|  21/12/2017|['Action', 'Adven...|{'Survival': 1483...|['PUBG Corporation']|   ['KRAFTON, Inc.']|

# Data Preprocessing
- Handling Missing Values

In [7]:
from pyspark.sql.functions import col, sum

null_counts = df1.select([sum(col(c).isNull().cast("int")).alias(c) for c in df1.columns])
null_counts.show()

+-----+----+------------+------+----+----------+----------+-----+-------------------+---------------+--------+--------+--------------+---------+
|appid|name|release_date|genres|tags|developers|publishers|price|discount_percentage|recommendations|positive|negative|peak_playernum|dlc_count|
+-----+----+------------+------+----+----------+----------+-----+-------------------+---------------+--------+--------+--------------+---------+
|    0|   2|           0|     5|  10|         9|         9|    0|                 11|             10|     110|      75|             9|        0|
+-----+----+------------+------+----+----------+----------+-----+-------------------+---------------+--------+--------+--------------+---------+



In [8]:
# We dropped null name rows, because a game without a name is invalid.
# We dropped null tag rows, because there were quite a few of them and we do not get much out of them.
# We dropped null peak_playernum rows, because if it has had no peak it means it it does not have a playerbase, so we do not need it.
df_cleaned = df1.dropna(subset=["name", "tags", "peak_playernum"])

# We changed null genres into "unknown", because there were only two of them. It won't impact the analyzis much.
df_filled = df_cleaned.fillna({"genres": "unknown", "developers": "unknown", "publishers": "unknown", "discount_percentage": "0", "recommendations": "0"
                               ,"positive": "0", "negative": "0"})




In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, regexp_replace, trim, when, round

In [10]:
df = df_filled.withColumn(
    "positive_clean",
    when(
        trim(col("positive")).rlike("^\d+$"),
        trim(col("positive")).cast("double")
    ).otherwise(0)
).withColumn(
    "negative_clean",
    when(
        trim(col("negative")).rlike("^\d+$"),
        trim(col("negative")).cast("double")
    ).otherwise(0)
).withColumn(
    "all_reviews",
    round(col("positive_clean") + col("negative_clean"), 2)
)


In [11]:
#creating the percentage of reviews columns
df_with_percentages = df.withColumn(
    "positive_percentage", 
    (F.col("positive") / F.col("all_reviews") * 100).cast("double")
).withColumn(
    "negative_percentage", 
    (F.col("negative") / F.col("all_reviews") * 100).cast("double")
)

df_with_percentages = df_with_percentages.withColumn(
    "positive_percentage", 
    F.round(F.col("positive_percentage"), 2)
).withColumn(
    "negative_percentage", 
    F.round(F.col("negative_percentage"), 2)
)

df_with_percentages.show()


+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------+--------+--------------+---------+--------------+--------------+-----------+-------------------+-------------------+
|  appid|                name|release_date|              genres|                tags|          developers|          publishers|price|discount_percentage|recommendations|positive|negative|peak_playernum|dlc_count|positive_clean|negative_clean|all_reviews|positive_percentage|negative_percentage|
+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------+--------+--------------+---------+--------------+--------------+-----------+-------------------+-------------------+
|    730|    Counter-Strike 2|  21/08/2012|['Action', 'Free ...|{'FPS': 90857, 'S...|           ['Valve']|         

In [12]:
df = df_with_percentages.fillna({"positive_percentage": "0", "negative_percentage": "0"})

In [13]:
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

+-----+----+------------+------+----+----------+----------+-----+-------------------+---------------+--------+--------+--------------+---------+--------------+--------------+-----------+-------------------+-------------------+
|appid|name|release_date|genres|tags|developers|publishers|price|discount_percentage|recommendations|positive|negative|peak_playernum|dlc_count|positive_clean|negative_clean|all_reviews|positive_percentage|negative_percentage|
+-----+----+------------+------+----+----------+----------+-----+-------------------+---------------+--------+--------+--------------+---------+--------------+--------------+-----------+-------------------+-------------------+
|    0|   0|           0|     0|   0|         0|         0|    0|                  0|              0|       0|       0|             0|        0|             0|             0|          0|                  0|                  0|
+-----+----+------------+------+----+----------+----------+-----+-------------------+-------

In [14]:
df.show()

+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------+--------+--------------+---------+--------------+--------------+-----------+-------------------+-------------------+
|  appid|                name|release_date|              genres|                tags|          developers|          publishers|price|discount_percentage|recommendations|positive|negative|peak_playernum|dlc_count|positive_clean|negative_clean|all_reviews|positive_percentage|negative_percentage|
+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------+--------+--------------+---------+--------------+--------------+-----------+-------------------+-------------------+
|    730|    Counter-Strike 2|  21/08/2012|['Action', 'Free ...|{'FPS': 90857, 'S...|           ['Valve']|         

**Duplicates**

In [15]:
id_duplicates = df.groupBy("appid").count().filter("count > 1")
name_duplicates = df.groupBy("name").count().filter("count > 1")

id_duplicates.show()
name_duplicates.show()

+--------------------+-----+
|               appid|count|
+--------------------+-----+
|nd deciding the s...|    2|
|   ahead of schedule|    2|
+--------------------+-----+

+--------------------+-----+
|                name|count|
+--------------------+-----+
|              ISLAND|    2|
|Romance of the Th...|    2|
|            Downfall|    2|
|      Eternal Return|    2|
|Ys I & II Chronic...|    2|
|               Chasm|    2|
|               Nomad|    2|
|Loading Screen Si...|    2|
|      Hero's Journey|    3|
|The Lord of the R...|    2|
|             Journey|    2|
|Call of Duty®: Bl...|    2|
|        Blood Strike|    2|
|Call of Duty®: Bl...|    2|
|          The Bunker|    3|
|    EA SPORTS FC™ 24|    4|
|              WASTED|    2|
|         Battle Ball|    2|
|            Paradise|    2|
|       Haunted House|    2|
+--------------------+-----+
only showing top 20 rows



In [16]:
df_no_duplicates = df.dropDuplicates(["name"])

name_duplicates = df_no_duplicates.groupBy("name").count().filter("count > 1")
name_duplicates.show()

+----+-----+
|name|count|
+----+-----+
+----+-----+



In [17]:
used_columns = [
    "appid", "name", "release_date", "genres", "tags", "developers", "publishers",
    "price", "discount_percentage", "recommendations", "peak_playernum", "dlc_count", "all_reviews", "positive_percentage", "negative_percentage"
]

df_final = df_no_duplicates.select(*used_columns)

In [18]:
df_final.show(10)

+-------+------------------------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------------+---------+-----------+-------------------+-------------------+
|  appid|                                name|release_date|              genres|                tags|          developers|          publishers|price|discount_percentage|recommendations|peak_playernum|dlc_count|all_reviews|positive_percentage|negative_percentage|
+-------+------------------------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------------+---------+-----------+-------------------+-------------------+
| 406760|                 "FL337 - ""Fleet"""|  20/06/2017|['Action', 'Adven...|"{'Action': 21, '...| ['KWL Productions']| ['KWL Productions']| 4.99|                  0|              0|             0|        0| 

MongoDB

In [None]:
# Convert Spark DataFrame to Pandas
pandas_df = df_final.toPandas()

# Convert to list of MongoDB-ready docs
records = pandas_df.to_dict(orient="records")

# Make appid the _id (so it's the primary key)
for doc in records:
    doc["_id"] = doc["appid"]  # Use appid as the _id

# Connect to MongoDB
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["steam_games_db"]
collection = db["games_cleaned"]

# Drop old collection if it exists
collection.drop()

collection.insert_many(records)

print(f"Inserted {len(records)} docs")


In [None]:
# Fetch all documents from the "games_cleaned" collection, excluding the "_id" field
games_docs = list(db["games_cleaned"].find({}, {"_id": 0}))  # Excluding "_id" field

# Convert the list of documents into a Spark DataFrame
df_games = spark.createDataFrame(games_docs)

# Ensure _id is at the beginning, followed by the other columns
columns = ['appid'] + [col for col in df_games.columns if col != 'appid']

# Reorder the DataFrame columns
df_games = df_games.select(*columns)

# Show the first 3 rows to inspect the order of the columns
df_games.show(3)

+-------+-----------+------------------+-------------------+---------+--------------------+--------------------+-------------------+--------------+-------------------+-----+------------------+---------------+------------+--------------------+
|  appid|all_reviews|        developers|discount_percentage|dlc_count|              genres|                name|negative_percentage|peak_playernum|positive_percentage|price|        publishers|recommendations|release_date|                tags|
+-------+-----------+------------------+-------------------+---------+--------------------+--------------------+-------------------+--------------+-------------------+-----+------------------+---------------+------------+--------------------+
|1963980|        8.0|    ['GDE ANIMES']|                  0|        0|['Action', 'Adven...|                 ...|               50.0|             0|               50.0| 1.99|    ['GDE ANIMES']|              0|  04/05/2022|{'Adventure': 67,...|
|3037060|        0.0|['100 C

# Spark optimization


In [None]:
# Cache the DataFrame if it will be used multiple times
df_games.cache()


DataFrame[appid: string, all_reviews: double, developers: string, discount_percentage: string, dlc_count: string, genres: string, name: string, negative_percentage: double, peak_playernum: string, positive_percentage: double, price: string, publishers: string, recommendations: string, release_date: string, tags: string]

In [None]:
# Show the execution plan to check optimizations
df_games.explain(True)


== Parsed Logical Plan ==
'Project ['appid, 'all_reviews, 'developers, 'discount_percentage, 'dlc_count, 'genres, 'name, 'negative_percentage, 'peak_playernum, 'positive_percentage, 'price, 'publishers, 'recommendations, 'release_date, 'tags]
+- LogicalRDD [all_reviews#14072, appid#14073, developers#14074, discount_percentage#14075, dlc_count#14076, genres#14077, name#14078, negative_percentage#14079, peak_playernum#14080, positive_percentage#14081, price#14082, publishers#14083, recommendations#14084, release_date#14085, tags#14086], false

== Analyzed Logical Plan ==
appid: string, all_reviews: double, developers: string, discount_percentage: string, dlc_count: string, genres: string, name: string, negative_percentage: double, peak_playernum: string, positive_percentage: double, price: string, publishers: string, recommendations: string, release_date: string, tags: string
Project [appid#14073, all_reviews#14072, developers#14074, discount_percentage#14075, dlc_count#14076, genres#140

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

# Define a UDF to classify games based on positive_percentage and recommendations
def classify_success(positive_percentage, recommendations):
    try:
        recommendations_int = int(recommendations)  # Convert recommendations to an integer
    except ValueError:
        recommendations_int = 0  # Default to 0 if conversion fails

    # Define conditions for classification
    if positive_percentage > 80 and recommendations_int > 1000:
        return "Successful"
    elif positive_percentage > 50 and recommendations_int > 100:
        return "Moderate"
    else:
        return "Unsuccessful"

# Register the UDF
classify_success_udf = udf(classify_success, StringType())

# Apply the UDF to add a new column 'success_category' based on the classification
games_with_success_classification = df_games.withColumn(
    "success_category", 
    classify_success_udf(col("positive_percentage"), col("recommendations"))
)

# Show the resulting DataFrame with the new 'success_category' column
games_with_success_classification.show()


+-------+-----------+--------------------+-------------------+---------+--------------------+--------------------+-------------------+--------------+-------------------+------+--------------------+---------------+------------+--------------------+----------------+
|  appid|all_reviews|          developers|discount_percentage|dlc_count|              genres|                name|negative_percentage|peak_playernum|positive_percentage| price|          publishers|recommendations|release_date|                tags|success_category|
+-------+-----------+--------------------+-------------------+---------+--------------------+--------------------+-------------------+--------------+-------------------+------+--------------------+---------------+------------+--------------------+----------------+
|1963980|        8.0|      ['GDE ANIMES']|                  0|        0|['Action', 'Adven...|                 ...|               50.0|             0|               50.0|  1.99|      ['GDE ANIMES']|        

In [None]:
games_with_success_classification.createOrReplaceTempView("games")

In [None]:
spark.sql("SELECT * FROM games").show(10)

+-------+--------------------+------------+--------------------+--------------------+------------------+--------------------+------+-------------------+---------------+--------------+---------+-----------+-------------------+-------------------+----------------+
|  appid|                name|release_date|              genres|                tags|        developers|          publishers| price|discount_percentage|recommendations|peak_playernum|dlc_count|all_reviews|positive_percentage|negative_percentage|success_category|
+-------+--------------------+------------+--------------------+--------------------+------------------+--------------------+------+-------------------+---------------+--------------+---------+-----------+-------------------+-------------------+----------------+
|1963980|                 ...|  04/05/2022|['Action', 'Adven...|{'Adventure': 67,...|    ['GDE ANIMES']|      ['GDE ANIMES']|  1.99|                  0|              0|             0|        0|        8.0|      

In [None]:
spark.sql("SELECT * FROM games WHERE success_category NOT LIKE 'Unsuccessful'").show()

+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------------+---------+-----------+-------------------+-------------------+----------------+
|  appid|                name|release_date|              genres|                tags|          developers|          publishers|price|discount_percentage|recommendations|peak_playernum|dlc_count|all_reviews|positive_percentage|negative_percentage|success_category|
+-------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+-------------------+---------------+--------------+---------+-----------+-------------------+-------------------+----------------+
| 341150|Age of Fear 2: Th...|  27/03/2015|['Indie', 'RPG', ...|{'Tactical RPG': ...|   ['Leslaw Sliwko']|     ['Age of Fear']|24.99|                  0|            269|             2|        1|      334.0|  

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Filter out rows with 'Unsuccessful' success_category
filtered_df = games_with_success_classification.filter(F.col("success_category") != "Unsuccessful")

## Comparing the overall success of the games in 2025

In [None]:
#Categorize prices
categorized_df = filtered_df.withColumn(
    "price_category",
    F.when(F.col("price") == 0, "Free")
     .when(F.col("price") < 10, "Budget")
     .when(F.col("price") < 30, "Mid-range")
     .otherwise("Premium")
)

In [None]:
success_order = F.when(F.col("success_category") == "Successful", 3) \
                 .when(F.col("success_category") == "Moderate", 2) \
                 .when(F.col("success_category") == "Unsuccessful", 1) \
                 .otherwise(0)


In [None]:
# numeric rank column
categorized_df = categorized_df.withColumn("success_rank", success_order)

# Window ordered by success_rank descending
price_window = Window.partitionBy("price_category").orderBy(F.desc("success_rank"))

# Apply ranking
price_analysis = categorized_df.withColumn("rank_in_category", F.rank().over(price_window))
price_analysis.select("appid","name", "price", "price_category", "success_category", "success_rank", "rank_in_category").show(truncate=False)


+-------+---------------------------------------------------------------------------------+-----+--------------+----------------+------------+----------------+
|appid  |name                                                                             |price|price_category|success_category|success_rank|rank_in_category|
+-------+---------------------------------------------------------------------------------+-----+--------------+----------------+------------+----------------+
|2055050|Cavalry Girls                                                                    |9.99 |Budget        |Successful      |3           |1               |
|64000  |Men of War: Assault Squad                                                        |9.99 |Budget        |Successful      |3           |1               |
|2835570|Buckshot Roulette                                                                |2.99 |Budget        |Successful      |3           |1               |
|302080 |Bus Driver                     

In [None]:
success_categories = ["Successful", "Moderate", "Unsuccessful"]

for category in success_categories:
    count = games_with_success_classification.filter(
        col("success_category") == category
    ).count()

    print(f"{category}: {count} games")

Successful: 3585 games
Moderate: 11455 games
Unsuccessful: 79138 games


In [None]:
best_games = price_analysis.filter(F.col("rank_in_category") != "3")
best_games.select("appid", "name", "price", "price_category", "success_category", "success_rank", "rank_in_category").show(truncate=False)

+-------+---------------------------------------------------------------------------------+-----+--------------+----------------+------------+----------------+
|appid  |name                                                                             |price|price_category|success_category|success_rank|rank_in_category|
+-------+---------------------------------------------------------------------------------+-----+--------------+----------------+------------+----------------+
|2055050|Cavalry Girls                                                                    |9.99 |Budget        |Successful      |3           |1               |
|64000  |Men of War: Assault Squad                                                        |9.99 |Budget        |Successful      |3           |1               |
|2835570|Buckshot Roulette                                                                |2.99 |Budget        |Successful      |3           |1               |
|302080 |Bus Driver                     

In [None]:
best_games_pandas = best_games.select(
    "appid", "name", "price", "price_category", "success_category", "success_rank", "rank_in_category"
).toPandas()

# Convert the Pandas DataFrame to a list of dictionaries
records = best_games_pandas.to_dict(orient="records")

In [None]:
# Make appid the _id (so it's the primary key)
for doc in records:
    doc["_id"] = doc["appid"]  # Use appid as the _id
    
# Insert the records into the 'successful_games' collection
collection = db["successful_games"]  # Define the collection name
collection.insert_many(records)  # Insert the list of dictionaries into MongoDB

InsertManyResult(['2055050', '64000', '2835570', '302080', '527450', '259550', '1866180', '913550', '1267910', '858210', '431960', '1211600', '2763670', '200910', '461620', '1955330', '475190', '39680', '260230', '952290', '948740', '25800', '324680', '1187510', '364420', '2726450', '499440', '1424660', '599750', '347830', '511470', '209080', '1451940', '798840', '204340', '1892420', '2190290', '397270', '317250', '1942280', '1801650', '483980', '622650', '1053680', '390520', '282440', '825630', '1340480', '571880', '55040', '743390', '1839880', '296470', '235320', '1025250', '228760', '2212330', '1292940', '1266690', '208200', '364390', '1999170', '1160490', '2237970', '92800', '437920', '1035510', '208650', '252670', '1583320', '98300', '1968730', '736190', '341940', '855640', '1951230', '1967510', '616110', '2533960', '790740', '334230', '282800', '537110', '447820', '248330', '593960', '743130', '688130', '915310', '1536070', '105600', '290770', '1168470', '2535770', '405290', '441

In [None]:
best_games.count()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\jansu\anaconda3\envs\BigData\Lib\socket.py", line 721, in readinto
    raise
TimeoutError: timed out


Importing the collection back for visualization purposes

In [None]:
import pandas as pd

# Query the 'successful_games' collection from MongoDB
successful_games_data = collection.find({}, {"_id": 0})  # Exclude _id

# Convert the MongoDB documents to a Pandas DataFrame
successful_games_df = pd.DataFrame(list(successful_games_data))

# Show the first few rows of the DataFrame
print(successful_games_df.head())

# Now you can use 'successful_games_df' for your visualization process


     appid                       name price price_category success_category  \
0  2055050              Cavalry Girls  9.99         Budget       Successful   
1    64000  Men of War: Assault Squad  9.99         Budget       Successful   
2  2835570          Buckshot Roulette  2.99         Budget       Successful   
3   302080                 Bus Driver  4.99         Budget       Successful   
4   527450        Cockroach Simulator  2.99         Budget       Successful   

   success_rank  rank_in_category  
0             3                 1  
1             3                 1  
2             3                 1  
3             3                 1  
4             3                 1  


In [None]:
from pyspark.sql.functions import explode, split, regexp_replace, col

# Step 1: Filter Successful and Moderate Games
best_games = price_analysis.filter(F.col("rank_in_category") != "3")

# Step 2: Exploding toilets
df_exploded_tags = best_games.withColumn(
    "tag", 
    explode(split(regexp_replace("tags", "[\\[\\]' ]", ""), ","))
)

# Step 3: Clean the tag by removing numbers after the colon
df_exploded_tags_clean = df_exploded_tags.withColumn(
    "clean_tag", 
    split(col("tag"), ":").getItem(0)  # Keeps only the first part (before the colon)
)



In [None]:
# Step 4: Count tags for successful games
df_successful = df_exploded_tags_clean.filter(col("success_category") == "Successful") \
    .groupBy("clean_tag").count().withColumnRenamed("count", "successful_count")

# Step 5: Count tags for moderate games
df_moderate = df_exploded_tags_clean.filter(col("success_category") == "Moderate") \
    .groupBy("clean_tag").count().withColumnRenamed("count", "moderate_count")

# Step 6: Perform a join on clean_tag to compare counts between successful and moderate games
df_joined = df_successful.join(
    df_moderate,
    on="clean_tag",  # Joining on the tag name
    how="outer"  # Outer join to include tags from both categories
)

# Step 7: Order the results based on the successful_count (highest successful_count first)
df_joined_ordered = df_joined.orderBy("successful_count", ascending=False)  # Order by successful_count, highest first

# Step 8: Show the results ordered by the successful count of tags
df_joined_ordered.show(10, truncate=False)

+---------------+----------------+--------------+
|clean_tag      |successful_count|moderate_count|
+---------------+----------------+--------------+
|Singleplayer   |2758            |7050          |
|Adventure      |1725            |4217          |
|Indie          |1674            |5353          |
|Action         |1552            |3627          |
|Atmospheric    |1228            |2694          |
|StoryRich      |1211            |2631          |
|2D             |1098            |3201          |
|Multiplayer    |1058            |2099          |
|Casual         |982             |3286          |
|GreatSoundtrack|932             |1229          |
+---------------+----------------+--------------+
only showing top 10 rows



In [None]:
# Convert the Spark DataFrame to a Pandas DataFrame
most_used_tags_df = df_joined_ordered.toPandas()

# Convert to a list of MongoDB-ready documents (JSON-like format)
records = most_used_tags_df.to_dict(orient="records")

# Insert the data into the 'most_used_tags' collection
collection = db["most_used_tags"]  # Using the already established connection

# Drop the existing collection if it exists (to avoid duplicates)
collection.drop()

# Insert the data into the collection
collection.insert_many(records)

print(f"Inserted {len(records)} documents into 'most_used_tags' collection.")


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\jansu\anaconda3\envs\BigData\Lib\socket.py", line 721, in readinto
    raise
TimeoutError: timed out
