# Data Cleaning

Encode categories (string,boolean)



In [0]:
import importlib
importlib.invalidate_caches()

In [0]:
%run "./00_Data_Loading" 

In [0]:
from utils import data_cleaning as dc
from data import steam_api_response_contract as contract
from data import tag_mappings 
from utils import config
import importlib
importlib.reload(contract)
importlib.reload(tag_mappings)
importlib.reload(dc)

### Convert to correct data types 

## Data Quality Checks 

### Fill Missing Data 

We might be able to fill the missing data using the steam api for rows with at least 1 missing column. CANT INCLUDE DISCOUNT BECAUSE RECOMMENDATIONS WAS COLLECTED AT A SPECIFIC AND A DISCOUNT NOW WOULDNT

Games DataFrame

In [0]:

def enrich_game_metadata_row(game_data, row):

    description = game_data["short_description"]
    genres = game_data["genres"]

    if description is None or description == "":
        return None 
    
    if genres is None or len(genres) == 0:
        return None
    
    tags = [genre["description"] for genre in genres]



    

### Remove Missing

### Remove Duplicates 

Decide which ones you want to keep based on X rule 

### Type Casting & Data Formatting

In [0]:
from pyspark.sql.functions import col

cleaned_games_df = games_df \
    .withColumn("app_id", col("app_id").cast("long")) \
    .withColumn("date_release", col("date_release").cast("date")) \
    .withColumn("win", col("win").cast("boolean")) \
    .withColumn("mac", col("mac").cast("boolean")) \
    .withColumn("linux", col("linux").cast("boolean")) \
    .withColumn("steam_deck", col("steam_deck").cast("boolean")) \
    .withColumn("positive_ratio", col("positive_ratio").cast("float")) \
    .withColumn("user_reviews", col("user_reviews").cast("int")) \
    .withColumn("price_final", col("price_final").cast("float")) \
    .withColumn("price_original", col("price_original").cast("float")) \
    .withColumn("discount", col("discount").cast("float")) 


cleaned_recommendations_df = recommendations_df \
    .withColumn("app_id", col("app_id").cast("long")) \
    .withColumn("helpful", col("helpful").cast("int")) \
    .withColumn("funny", col("funny").cast("int")) \
    .withColumn("date", col("date").cast("date")) \
    .withColumn("is_recommended", col("is_recommended").cast("boolean")) \
    .withColumn("hours", col("hours").cast("float")) \
    .withColumn("user_id", col("user_id").cast("long")) \
    .withColumn("review_id", col("review_id").cast("long")) 


cleaned_users_df = users_df \
    .withColumn("user_id", col("user_id").cast("long")) \
    .withColumn("products", col("products").cast("int")) \
    .withColumn("reviews", col("reviews").cast("int"))


In [0]:
# Games DataFrame
print("Games DF")
cleaned_games_df.printSchema()

# Games Metadata DataFrame
print("Games Metadata DF")
cleaned_games_metadata_df.printSchema()


# Recommendations DataFrame
print("Recommendations DF")
cleaned_recommendations_df.printSchema()


# Users DataFrame
print("Users DF")
cleaned_users_df.printSchema()


## Map Tags 


In [0]:
import re
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def normalize_tag_key(tag):
    if tag is None:
        return None
    tag = tag.lower().strip()
    tag = re.sub(r"[\s\-]+", " ", tag)
    tag = tag.replace(" ", "-")

    #print("cleaned tag: "   +tag)
    return tag

def normalize_tags(tags):
    if not tags:
        return []
    normalized = []
    for t in tags:

        if t != "" or "NA" != t:
            key = normalize_tag_key(t)

            normalized.append(tag_mappings.tag_mappings.get(key, key))
    return normalized

normalize_tags_udf = udf(normalize_tags, ArrayType(StringType()))









In [0]:
cleaned_games_metadata_df = games_metadata_df.withColumn("tags", normalize_tags_udf(col("tags")))
cleaned_games_metadata_df.select("tags").show(truncate=False)


example

In [0]:
tags = ["Co-op", "Stealth", "Indie"]

print(normalize_tags(tags))

In [0]:
print(tag_mappings.tag_mappings)

###  Encoding Categorical Variables

In [0]:
rating_list = [row['rating'] for row in games_df.select("rating").distinct().collect()]

# Define the correct order manually
rating_order = [
    'Overwhelmingly Negative',
    'Very Negative', 
    'Mostly Negative',
    'Negative',
    'Mixed',
    'Mostly Positive',
    'Positive',
    'Very Positive',
    'Overwhelmingly Positive'
]

# Create mapping dictionary
rating_mapping = {rating: idx for idx, rating in enumerate(rating_order)}
print(rating_mapping)

In [0]:
cleaned_games_df = dc.encode_column(games_df, "rating", rating_mapping)

### Join games with games metadata

In [0]:

joined_games_df = games_df.join(
    games_metadata_df.select("app_id", "description", "tags"),
    on="app_id",
    how="left"
)



joined_games_df.write.mode("overwrite").parquet(fp)


In [0]:
joined_games_df.take(5)

In [0]:
import requests 
import requests


 # filter by rows that are null or empty. 





app_id = 371970  # Dota 2
url = f"https://store.steampowered.com/api/appdetails?appids={app_id}"

response = requests.get(url)
data = response.json()

game_data = data[str(app_id)]['data']




print(game_data)




Store cleaned dataframes

In [0]:

cleaned_games_df.write.mode("overwrite").format("delta").save("/Volumes/workspace/steam-game-recommender/cleandeddata/games")



In [0]:
df = spark.read.format("delta").load("/Volumes/workspace/steam-game-recommender/cleandeddata")
