In [5]:
#Import Relevant Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql.functions import regexp_replace, col, concat_ws, when, regexp_extract
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType
from pyspark.sql.functions import lower

# Clean The Review Data

In [6]:
# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Data Cleaning with Spark") \
    .getOrCreate()

# Load the data into Spark DataFrames
df_palworld = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("multiLine", "true") \
                        .option("quote", "\"") \
                        .option("escape", "\"") \
                        .csv('Palworld.csv')

df_craftopia = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("multiLine", "true") \
                        .option("quote", "\"") \
                        .option("escape", "\"") \
                        .csv('Craftopia.csv', header=True, inferSchema=True)
df_lethal = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("multiLine", "true") \
                        .option("quote", "\"") \
                        .option("escape", "\"") \
                        .csv('LethalCompany.csv', header=True, inferSchema=True)


In [7]:
# Replace 'EARLY ACCESS REVIEW' with an empty string in the 'ReviewText' column
df_palworld = df_palworld.withColumn("ReviewText", regexp_replace("ReviewText", "EARLY ACCESS REVIEW", ""))

# Show the first few rows of the DataFrame to verify changes
df_palworld.show()

+--------------------+-----------+------------+-------------------+---------------+
|          ReviewText|     Review|ReviewLength|          PlayHours|     DatePosted|
+--------------------+-----------+------------+-------------------+---------------+
|              i like|Recommended|          22| 25.8 hrs on record|Posted: 5 April|
|  Pokemon but better|Recommended|          33| 10.1 hrs on record|Posted: 5 April|
|           good game|Recommended|          25| 17.8 hrs on record|Posted: 5 April|
|You can kidnap pe...|Recommended|          37|  5.0 hrs on record|Posted: 5 April|
|           Fun game.|Recommended|          25|122.1 hrs on record|Posted: 5 April|
|                 123|Recommended|          20| 47.7 hrs on record|Posted: 5 April|
|Fun and interacti...|Recommended|          38| 49.7 hrs on record|Posted: 5 April|
|Love this game! Y...|Recommended|         126| 36.9 hrs on record|Posted: 5 April|
|good but DIGTOISE...|Recommended|         235| 64.0 hrs on record|Posted: 5

In [8]:
df_palworld.select("Review").distinct().show()

+---------------+
|         Review|
+---------------+
|Not Recommended|
|    Recommended|
+---------------+



In [9]:
# Remove duplicates from df_palworld
df_palworld = df_palworld.dropDuplicates()

# Add a new column 'index' that acts as a row identifier
df_palworld = df_palworld.withColumn("index", monotonically_increasing_id())

In [10]:
# Calculate the number of rows in the df_palworld DataFrame
num_rows = df_palworld.count()

# Print the result
print("Number of rows in df_palworld:", num_rows)

Number of rows in df_palworld: 150


In [11]:
# Cleaning df_craftopia
df_craftopia = df_craftopia.withColumn("ReviewText", regexp_replace("ReviewText", "EARLY ACCESS REVIEW", ""))
df_craftopia = df_craftopia.dropDuplicates()
df_craftopia = df_craftopia.withColumn("index", monotonically_increasing_id())

# Cleaning df_lethal
df_lethal = df_lethal.withColumn("ReviewText", regexp_replace("ReviewText", "EARLY ACCESS REVIEW", ""))
df_lethal = df_lethal.dropDuplicates()
df_lethal = df_lethal.withColumn("index", monotonically_increasing_id())


In [12]:
# Combining all three columns
df_lethal = df_lethal.withColumn("Game", lit("Lethal Companies"))
df_palworld = df_palworld.withColumn("Game", lit("Palworld"))
df_craftopia = df_craftopia.withColumn("Game", lit("Craftopia"))

In [13]:
# Combine all DataFrames into one
df_combined = df_lethal.unionByName(df_palworld).unionByName(df_craftopia)

In [14]:
# Extracting the numerical part of the "PlayHours" and converting it to float
df_combined = df_combined.withColumn("PlayHours", regexp_extract(col("PlayHours"), "(\d+\.\d+|\d+)", 0).cast("float"))
df_combined = df_combined.drop("index")
# Show the updated DataFrame to verify the changes
df_combined.show()

+--------------------+---------------+------------+---------+---------------+----------------+
|          ReviewText|         Review|ReviewLength|PlayHours|     DatePosted|            Game|
+--------------------+---------------+------------+---------+---------------+----------------+
|           good game|    Recommended|          25|     34.1|Posted: April 5|Lethal Companies|
|  very lethal, 10/10|    Recommended|          33|     19.9|Posted: April 5|Lethal Companies|
|pretty fun game e...|    Recommended|          99|     48.7|Posted: April 5|Lethal Companies|
|                good|    Recommended|          21|     33.9|Posted: April 5|Lethal Companies|
|I saw my friend b...|    Recommended|          84|     58.4|Posted: April 5|Lethal Companies|
|The community is ...|    Recommended|          89|     19.6|Posted: April 5|Lethal Companies|
|             love it|    Recommended|          23|      4.6|Posted: April 5|Lethal Companies|
|The future pentac...|    Recommended|          47

In [15]:
# Convert 'ReviewLength' from string to integer
df_combined = df_combined.withColumn("ReviewLength", col("ReviewLength").cast("integer"))

In [16]:
# Lowercase all string columns in the DataFrame
df_combined = df_combined.select(
    *[lower(col).alias(col) if dtype == "string" else col for col, dtype in df_combined.dtypes]
)

# Lowercase column names
df_combined = df_combined.toDF(*[col.lower() for col in df_combined.columns])
df_combined.show()

+--------------------+---------------+------------+---------+---------------+----------------+
|          reviewtext|         review|reviewlength|playhours|     dateposted|            game|
+--------------------+---------------+------------+---------+---------------+----------------+
|           good game|    recommended|          25|     34.1|posted: april 5|lethal companies|
|  very lethal, 10/10|    recommended|          33|     19.9|posted: april 5|lethal companies|
|pretty fun game e...|    recommended|          99|     48.7|posted: april 5|lethal companies|
|                good|    recommended|          21|     33.9|posted: april 5|lethal companies|
|i saw my friend b...|    recommended|          84|     58.4|posted: april 5|lethal companies|
|the community is ...|    recommended|          89|     19.6|posted: april 5|lethal companies|
|             love it|    recommended|          23|      4.6|posted: april 5|lethal companies|
|the future pentac...|    recommended|          47

In [17]:
# Print the schema of the df_combined DataFrame
df_combined.printSchema()


root
 |-- reviewtext: string (nullable = true)
 |-- review: string (nullable = true)
 |-- reviewlength: integer (nullable = true)
 |-- playhours: float (nullable = true)
 |-- dateposted: string (nullable = true)
 |-- game: string (nullable = false)



In [18]:
# Save DataFrame to Parquet
df_combined.write.mode('overwrite').parquet('df_combined.parquet')

                                                                                

In [19]:
# Stop the SparkSession
spark.stop()

# Clean Game Info Data

In [16]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Cleaning with Spark") \
    .getOrCreate()

# Load the CSV files into Spark DataFrames
info_palworld = spark.read.csv('InfoPalworld.csv', header=True, inferSchema=True)
info_craftopia = spark.read.csv('InfoCraftopia.csv', header=True, inferSchema=True)
info_lethal = spark.read.csv('InfoLethal.csv', header=True, inferSchema=True)

# Normalize the game name in the info_lethal DataFrame
info_lethal = info_lethal.withColumn("Title", 
                                     when(col("Title") == "Lethal Company", "Lethal Companies")
                                     .otherwise(col("Title")))

# Combine all DataFrames into one
combined_info = info_lethal.unionByName(info_craftopia).unionByName(info_palworld)

# Clean up the 'Tags' column by removing unwanted symbols
combined_info = combined_info.withColumn("Tags", regexp_replace("Tags", r'[^\w\s,]', ''))

# Convert 'In-Game Count' to integer after removing commas
combined_info = combined_info.withColumn(
    "In-Game Count",
    regexp_replace(col("In-Game Count"), ",", "").cast("int")
)

# Lowercase all string columns in the DataFrame
combined_info = combined_info.select(
    *[lower(col).alias(col) if dtype == "string" else col for col, dtype in combined_info.dtypes]
)

# Lowercase column names
combined_info = combined_info.toDF(*[col.lower() for col in combined_info.columns])

In [17]:
combined_info.show()

+----------------+----------+-------------+--------------------+--------------------+
|           title| developer|in-game count|                tags|          categories|
+----------------+----------+-------------+--------------------+--------------------+
|lethal companies|  zeekerss|        22876| online coop,  ho...|single-player, on...|
|       craftopia|pocketpair|          335| open world,  cra...|single-player, on...|
|        palworld|pocketpair|        60281| multiplayer,  op...|single-player, on...|
+----------------+----------+-------------+--------------------+--------------------+



In [18]:
combined_info = combined_info.withColumn("in-game count", col("in-game count").cast("integer"))
combined_info.printSchema()

root
 |-- title: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- in-game count: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- categories: string (nullable = true)



In [19]:
# Rename the 'Title' column to 'Game'
combined_info = combined_info.withColumnRenamed("title", "game")
combined_info.show()

+----------------+----------+-------------+--------------------+--------------------+
|            game| developer|in-game count|                tags|          categories|
+----------------+----------+-------------+--------------------+--------------------+
|lethal companies|  zeekerss|        22876| online coop,  ho...|single-player, on...|
|       craftopia|pocketpair|          335| open world,  cra...|single-player, on...|
|        palworld|pocketpair|        60281| multiplayer,  op...|single-player, on...|
+----------------+----------+-------------+--------------------+--------------------+



In [20]:
# Save the cleaned DataFrame to a Parquet file
combined_info.write.mode('overwrite').parquet('combined_info.parquet')

In [21]:
# Stop the Spark session
spark.stop()

# Clean Pricing Data

In [22]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Pricing Data Cleaning") \
    .getOrCreate()

# Load the pricing data into Spark DataFrames
pricing_palworld = spark.read.csv('Palworld_pricing.csv', header=True, inferSchema=True)
pricing_craftopia = spark.read.csv('Craftopia_pricing.csv', header=True, inferSchema=True)
pricing_lethal = spark.read.csv('Lethal_pricing.csv', header=True, inferSchema=True)

In [23]:
pricing_palworld = pricing_palworld.withColumn("Game", lit("Palworld"))
pricing_craftopia = pricing_craftopia.withColumn("Game", lit("Craftopia"))
pricing_lethal = pricing_lethal.withColumn("Game", lit("Lethal Companies"))
pricing_combined = pricing_palworld.unionByName(pricing_craftopia).unionByName(pricing_lethal)

In [24]:
def clean_price_udf(price_str):
    """UDF to clean the price field and keep only digits, periods, and commas."""
    return ''.join(filter(lambda x: x.isdigit() or x in '.,', price_str))

# Register UDF
spark.udf.register("clean_price_udf", clean_price_udf, StringType())
clean_price = udf(clean_price_udf, StringType())

# Apply the cleaning UDF to price columns
pricing_combined = pricing_combined.withColumn("Current Price", clean_price(col("Current Price")))
pricing_combined = pricing_combined.withColumn("Converted Price", clean_price(col("Converted Price")))
pricing_combined = pricing_combined.withColumn("Lowest Recorded Price", clean_price(col("Lowest Recorded Price")))

# Convert the cleaned price fields to float
pricing_combined = pricing_combined.withColumn("Current Price", col("Current Price").cast(FloatType()))
pricing_combined = pricing_combined.withColumn("Converted Price", col("Converted Price").cast(FloatType()))
pricing_combined = pricing_combined.withColumn("Lowest Recorded Price", col("Lowest Recorded Price").cast(FloatType()))

# Lowercase column names
pricing_combined = pricing_combined.toDF(*[col.lower() for col in pricing_combined.columns])

# Lowercase all string columns in the DataFrame
for col_name in pricing_combined.columns:
    if pricing_combined.schema[col_name].dataType == StringType():
        pricing_combined = pricing_combined.withColumn(col_name, lower(col(col_name)))

In [25]:
pricing_combined.show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------------------+-------------+---------------+---------------------+--------+
|           currency|current price|converted price|lowest recorded price|    game|
+-------------------+-------------+---------------+---------------------+--------+
|      british pound|       22.491|          22.49|               22.491|palworld|
|   south asia - usd|        9.441|           7.47|                 7.47|palworld|
|      russian ruble|      99010.0|           8.47|                 8.47|palworld|
| south african rand|      243.001|          10.31|                10.31|palworld|
|  cis - u.s. dollar|       13.041|          10.32|                10.32|palworld|
|       chinese yuan|       97.201|          10.63|                10.63|palworld|
|latam - u.s. dollar|       13.491|          10.68|                10.68|palworld|
| mena - u.s. dollar|       13.491|          10.68|                10.68|palworld|
|  malaysian ringgit|       64.801|           10.8|                 10.8|palworld|
|   

                                                                                

In [26]:
pricing_combined.write.mode('overwrite').parquet('pricing_combined.parquet')


                                                                                

In [27]:
spark.stop()