# Imports

In [None]:
# System
import os
from pathlib import Path
from dotenv import load_dotenv
import importlib
import sys

# Data Management
from typing import Optional, Dict, List, Any
from pydantic import BaseModel, Field
import time
import random
import json
from bson import json_util
import bson
import re
import difflib
from bs4 import BeautifulSoup
import unicodedata
from collections import Counter

# Data Science
import numpy as np
import pandas as pd
import polars as pl
import missingno as msno
import flatten_json

# Ignore warnings
from bs4 import MarkupResemblesLocatorWarning
import warnings

warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)

In [None]:
# Add project root to path
sys.path.append(str(Path.cwd().parent))

# Import your modules
from src import steam_api_manager
from src import mongo_manager
from src import postgres_manager

# Configuration

In [None]:
# Load Environment Variables
load_dotenv()

# When you need to reload after changes
importlib.reload(steam_api_manager)
importlib.reload(mongo_manager)

# Get fresh instances of your classes
steam_api = steam_api_manager.SteamAPIManager()
mongo_manager = mongo_manager.MongoManager()

# Functions

## Clean Strings

In [None]:
def remove_html_tags(text: str):
    if isinstance(text, str):
        soup = BeautifulSoup(text or "", "html.parser")
        return soup.get_text(separator=" ").strip()
    return text

def remove_html_tags_df(df: pl.LazyFrame, cols: list) -> pl.LazyFrame:
    return df.with_columns([
        pl.col(col).map_elements(remove_html_tags, return_dtype=pl.String).alias(col)
        for col in cols
    ])
    
def normalize_strings(text: str) -> str:
    if isinstance(text, str):
        return unicodedata.normalize('NFKC', text).strip()
    return text

def normalize_strings_df(df: pl.LazyFrame, cols: list) -> pl.LazyFrame:
    return df.with_columns([
        pl.col(col).map_elements(normalize_strings, return_dtype=pl.String).alias(col)
        for col in cols
    ])
    
def strip_list_strings_df(df: pl.LazyFrame, cols: list) -> pl.LazyFrame:
    return df.with_columns([
        pl.col(col).list.eval(pl.element().str.strip_chars().alias(col))
        for col in cols
    ])

## Clean Ages

In [None]:
def remove_plus_sign(age: str) -> str:
    if isinstance(age, str):
        return age.replace('+', '')
    return age

def remove_plus_sign_df(df: pl.LazyFrame, cols: list) -> pl.LazyFrame:
    return df.with_columns([
        pl.col(col).map_elements(remove_plus_sign, return_dtype=pl.String).alias(col)
        for col in cols
    ])
    
def convert_age_to_int_df(df: pl.LazyFrame, cols: list) -> pl.LazyFrame:
    return df.with_columns([
        pl.col(col)
        .cast(pl.Int8, strict=False)  # coerce errors to null
        .fill_null(0)
        .alias(col)
        for col in cols
    ])
    
def filter_age(age: int) -> int:
    if isinstance(age, int):
        return 0 if age < 0 or age > 21 else age
    return age
    
def filter_age_df(df: pl.LazyFrame, cols: list) -> pl.LazyFrame:
    return df.with_columns([
        pl.col(col).map_elements(filter_age, return_dtype=pl.Int8).alias(col)
        for col in cols
    ])

## Clean Descriptions

In [None]:
def is_diff_texts(a: str, b: str) -> bool:
    a_lines = (a or "").splitlines()
    b_lines = (b or "").splitlines()
    diff = difflib.ndiff(a_lines, b_lines)
    return bool(any(line.startswith(('- ', '+ ')) for line in diff))

# Polars Data Pipeline

this is the final layer of eda for the silver layer, the following is the order in which the data pipeline will be organized:

1. Drop columns (moved to MongoDB, when reading)
2. Filter data (moved to MongoDB, when reading)
3. Type optimization (downcast)
4. String & Date Normalization
5. Missing data handling
6. Feature Engineering / Transformations
7. Indexing / Sorting (merging purposes)
8. Column reordering / renaming
9. Export / Store

this time it will be done with polars to speed up the process and because polars will be used in production instead of pandas


## Read Data

In [None]:
# Filter documents
query = {
    # Filter type in the list
    'type': {'$in': ['game', 'dlc', 'demo', 'series', 'episode', 'music', 'mod']},
    # Filter only already published apps
    'release_date.coming_soon': False
}

# Read only necessary columns
projection = {
    '_id': 0,
    # Key
    'appid': 1, 'name': 1, 'fullgame.appid': 1,
    # Description
    'about_the_game': 1, 'detailed_description': 1, 'short_description': 1,
    # Images
    'header_image': 1, 'background_raw': 1, 'screenshots': 1, 'movies': 1,
    # Price
    'is_free': 1, 'price_overview.currency': 1, 'price_overview.initial': 1,
    # Meta
    'type': 1, 'categories': 1, 'genres': 1, 'supported_languages': 1, 'controller_support': 1,
    # Date
    'release_date.date': 1,
    # Contract
    'developers': 1, 'publishers': 1, 'content_descriptors.notes': 1,
    # Progress
    'achievements.highlighted': 1,
    # Computer Specs
    'platforms': 1, 
    'pc_requirements.minimum': 1, 'pc_requirements.recommended': 1,
    'mac_requirements.minimum': 1, 'mac_requirements.recommended': 1,
    'linux_requirements.minimum': 1, 'linux_requirements.recommended': 1,
    # Required Age
    'required_age': 1,
    'ratings.dejus.required_age': 1, 'ratings.steam_germany.required_age': 1, 'ratings.csrr.required_age': 1,
    'ratings.esrb.required_age': 1, 'ratings.pegi.required_age': 1, 'ratings.usk.required_age': 1,
    'ratings.oflc.required_age': 1, 'ratings.nzoflc.required_age': 1, 'ratings.kgrb.required_age': 1,
    'ratings.mda.required_age': 1, 'ratings.fpb.required_age': 1, 'ratings.crl.required_age': 1,
    'ratings.bbfc.required_age': 1, 'ratings.cero.required_age': 1, 'ratings.agcom.required_age': 1,
    'ratings.cadpa.required_age': 1, 'ratings.video.required_age': 1,
}


# Fetch documents
cursor = mongo_manager.database.details.find(query, projection)
list_cursor = list(cursor)

# Print count of documents
print(f"Number of documents: {len(list_cursor)}")

df = pd.json_normalize(list_cursor)
df

# # Convert BSON to JSON-compatible (handles ObjectId, etc.)
# parsed_docs = json.loads(json_util.dumps(list(cursor)))

# # Print count of documents
# print(f"Number of documents: {len(parsed_docs)}")

# df = pd.json_normalize(parsed_docs)
# df

In [None]:
def get_deep_type(val: Any) -> str:
    """Recursively detect the type inside lists or dicts."""
    if isinstance(val, list):
        if not val:
            return 'list[empty]'
        return f"list[{', '.join(sorted({get_deep_type(v) for v in val}))}]"
    elif isinstance(val, dict):
        return "dict"
    elif pd.isna(val):
        return "null"
    else:
        return type(val).__name__

def infer_column_types(df: pd.DataFrame, sample_size: int = 1000) -> dict:
    """Check deep types of values in a DataFrame."""
    type_report = {}

    for col in df.columns:
        types = Counter()
        for val in df[col].dropna().head(sample_size):
            try:
                dtype = get_deep_type(val)
                types[dtype] += 1
            except Exception as e:
                types[f"Error: {str(e)}"] += 1

        type_report[col] = dict(types)

    return type_report

def convert_mixed_columns_to_string(df: pd.DataFrame, sample_size: int = 1000) -> pd.DataFrame:
    """Convert columns with mixed types to string."""
    type_info = infer_column_types(df, sample_size)
    mixed_columns = [col for col, types in type_info.items() if len(types) > 1]

    for col in mixed_columns:
        df[col] = df[col].apply(lambda x: str(x) if not (x is None or isinstance(x, float) and np.isnan(x)) else None)

    return df

df_pandas_to_polars = convert_mixed_columns_to_string(df, sample_size=df.shape[0])

df_pl = pl.LazyFrame(
    df_pandas_to_polars,
    strict=False, 
    # infer_schema_length=len(parsed_docs)
    infer_schema_length=len(list_cursor)
)
# drop early
columns_to_drop = ['pc_requirements', 'mac_requirements', 'linux_requirements', 'ratings']
columns_to_drop_in_df = [col for col in columns_to_drop if col in df_pl.columns]
df_pl = df_pl.drop(columns_to_drop)

df_pl.head().collect()

## Drop Columns (deprecated)

In [None]:
# df_dropped_columns = (
#     df_pl.drop(
#         [
#             '_id.$oid', 'steam_appid', 'alternate_appid',
#             'capsule_image', 'capsule_imagev5', 'background',
#             'metacritic.score', 'metacritic.url',
#             *[col for col in df.columns if 'ratings.' in col and '.required_age' not in col],
#             'drm_notice',
#             'price_overview.recurring_sub', 'price_overview.recurring_sub_desc',
#             'price_overview.final', 'price_overview.discount_percent', 
#             'price_overview.initial_formatted', 'price_overview.final_formatted',
#             'pc_requirements', 'mac_requirements', 'linux_requirements',
#             'website', 'support_info.url', 'support_info.email', 'legal_notice', 'content_descriptors.ids',
#             'packages', 'package_groups',
#             'dlc', 'demos', 'fullgame.name',
#             'reviews', 'ratings', 'recommendations.total',
#             'ext_user_account_notice', 'achievements.total'
#         ]
#     )
# )

## Filter Data (deprecated)

In [None]:
# df_filtered = (
#     df_dropped_columns
#     .filter(
#         (pl.col('type').is_in(['game', 'dlc', 'demo', 'series', 'episode', 'music', 'mod'])) &
#         (pl.col('release_date.coming_soon') == False)
#     )
#     .drop('release_date.coming_soon')
# )
# df_filtered.head().collect()

## Type Optimization

In [None]:
df_filtered = df_pl.clone()

cols_list = [name for name, type in df_filtered.collect_schema().items() if type == pl.List]

df_type_optimized = (
    df_filtered
    .with_columns(
        # Convert List[Struct{}] to List[str]
        pl.col('categories').list.eval(pl.element().struct.field('description')),
        pl.col('genres').list.eval(pl.element().struct.field('description')),
        pl.col('screenshots').list.eval(pl.element().struct.field('path_full')),
        pl.col('movies').list.eval(pl.element().struct.field('mp4').struct.field('max')),
        # Convert String to Int
        pl.col('fullgame.appid').cast(pl.Int64),
        # Convert String to Boolean
        pl
        .when(pl.col('controller_support') == 'full')
        .then(pl.lit(True))
        .when(pl.col('controller_support').is_null())
        .then(pl.lit(False))
        .otherwise(pl.lit(None))
        .alias('controller_support')
    )
)

# # Check that the length of the unnested achievements.highlighted columns are the same
# df_result = df_type_optimized.with_columns(
#     (pl.col("achievements_name").list.len() == pl.col("achievements_img").list.len()).alias("same_length")
# )
# df_result.collect().shape[0] == df_type_optimized.collect().shape[0]

# Check the new types
df_type_optimized.select([
    'appid', 'name', 
    'categories',
    'genres', 
    'screenshots',
    'movies',
    # 'achievements.highlighted'
]).head(100).collect()

## Data Cleaning

In [None]:
cols_str = [
    col 
    for col, dtype in df_type_optimized.collect_schema().items() 
    if dtype == pl.String
]
cols_list_string = [
    col 
    for col, dtype in df_type_optimized.collect_schema().items()
    if isinstance(dtype, pl.List) and dtype.inner == pl.String
]
# cols_list_struct = [
#     col 
#     for col, dtype in df_type_optimized.collect_schema().items()
#     if isinstance(dtype, pl.List) and dtype.inner == pl.Struct
# ]
cols_age = [
    col 
    for col in df_type_optimized.collect_schema().names() 
    if 'required_age' in col
]

df_cleaned = (
    df_type_optimized
    # Normalize all strings
    .pipe(normalize_strings_df, cols=cols_str)
    .pipe(remove_html_tags_df, cols=cols_str)
    
    # Strip all list strings
    .pipe(strip_list_strings_df, cols=cols_list_string)
    
    # Clean age
    .pipe(remove_plus_sign_df, cols=cols_age)
    .pipe(convert_age_to_int_df, cols=cols_age)
    .pipe(filter_age_df, cols=cols_age)
)
df_cleaned.head().collect()

## Wrangling

### Merged required_ages

In [None]:
df_merged_ages = (
    df_cleaned
    # Get the highest required age value across all required_age columns
    .with_columns(
        pl.max_horizontal(cols_age).alias('required_age_cleaned'),
    )
    # Drop all other required_age columns
    .drop(cols_age)
    # Rename the required_age_cleaned column
    .rename({
        'required_age_cleaned': 'required_age'
    })
)

df_merged_ages.head().collect()

### Merged descriptions

Add a check to see if the about_the_game or detailed_description is not an empty string

this means a boolean column should be added.

In [None]:
df_merged_descriptions = (
    df_merged_ages
    # Compute if descriptions are different
    .with_columns(
        pl.struct(["about_the_game", "detailed_description"])
        .map_elements(
            lambda x: is_diff_texts(x["about_the_game"], x["detailed_description"]), 
            return_dtype=pl.Boolean
        )
        .alias("desc_has_diff")
    )

    # Merge descriptions based on logic
    .with_columns(
        # Case 1: if both are empty
        pl.when((pl.col("about_the_game") == "") & (pl.col("detailed_description") == ""))
        # then empty string
        .then(pl.lit(None))  

        # Case 2: if about_the_game and not detailed_description
        .when((pl.col("about_the_game") != "") & (pl.col("detailed_description") == ""))
        # then about_the_game
        .then(  
            pl.concat_str([
                pl.lit("[about_the_game]\n"),
                pl.col("about_the_game")
            ], separator="")
        )
        
        # Case 3: if not about_the_game and detailed_description
        .when((pl.col("about_the_game") == "") & (pl.col("detailed_description") != ""))
        # then detailed_description
        .then(
            pl.concat_str([
                pl.lit("[detailed_description]\n"),
                pl.col("detailed_description")
            ], separator="")
        )

        # Case 4: if both but different
        .when(pl.col("desc_has_diff"))
        # then about_the_game + detailed_description
        .then(  
            pl.concat_str([
                pl.lit("[about_the_game]\n"),
                pl.col("about_the_game"),
                pl.lit("\n\n[detailed_description]\n"),
                pl.col("detailed_description")
            ], separator="")
        )

        # Case 5: if both but same
        .otherwise(  # Case 5: both present but identical or not different enough
            pl.concat_str([
                pl.lit("[detailed_description]\n"),
                pl.col("detailed_description")
            ], separator="")
        )
        .alias("detailed_description"),
        
        # Short description
        pl.when(
            pl.col('short_description') == ''
        )
        .then(
            pl.lit(None)
        )
        .otherwise(
            pl.col('short_description')
        )
        .alias('short_description')
    )

    # Add boolean flag for non-empty final description
    .with_columns(
        ~(
            pl.concat_str([
                pl.col("detailed_description").fill_null(''),
                pl.col("short_description").fill_null(''),
                pl.col("content_descriptors.notes").fill_null('')
            ])
            .str.strip_chars()
            .str.len_chars() > 0
        )
        .alias("enrich.is_description")
    )

    # Drop temp columns
    .drop("desc_has_diff", "about_the_game")
)

df_merged_descriptions.head().collect()

### Cleaned dates

In [None]:
df_cleaned_dates = (
    df_merged_descriptions
    .with_columns(
        pl.coalesce([
            pl.col("release_date.date").str.to_titlecase().str.strptime(pl.Date, "%d %b %Y", strict=False), # 1 Jan 2025
            pl.col("release_date.date").str.to_titlecase().str.strptime(pl.Date, "%d. %b. %Y", strict=False), # 1. Jan. 2025
            pl.col("release_date.date").str.strptime(pl.Date, "%d %b, %Y", strict=False), # 1 Jan, 2025
            pl.col("release_date.date").str.strptime(pl.Date, "%b %d, %Y", strict=False), # Jan 1, 2025
        ]).alias("release_date.date")
    )
    .with_columns(
        ~(pl.col("release_date.date").is_not_null())
        .alias("enrich.is_date")
    )
)

# Check that dates are cleaned correctly
(
    df_cleaned_dates
    .filter(
        (pl.col("enrich.is_date"))
    )
    .select([
        "appid",
        "name",
        "type",
        "release_date.date",
        "enrich.is_date"
    ])
    .sort('release_date.date', descending=True)
).head().collect()

### Cleaned computer specs

Add a check to see if the computer specs are filled even when certain OS is true (that it is not only a template)

this means a boolean column should be added.

In [None]:
df_cleaned_computer_specs = (
    df_cleaned_dates
    .with_columns(
        # Windows Minimum
        pl.when(
            ~(pl.col('platforms.windows')) |
            (pl.col('pc_requirements.minimum')
            .str.strip_chars()
            .str.strip_chars(":")  # remove trailing colons just in case
            .str.to_lowercase() == "minimum")
        )
        .then(pl.lit(None))
        .otherwise(pl.col('pc_requirements.minimum'))
        .alias('pc_requirements.minimum'),

        # Windows Recommended
        pl.when(
            ~(pl.col('platforms.windows')) |
            (pl.col('pc_requirements.recommended')
            .str.strip_chars()
            .str.strip_chars(":")
            .str.to_lowercase() == "recommended")
        )
        .then(pl.lit(None))
        .otherwise(pl.col('pc_requirements.recommended'))
        .alias('pc_requirements.recommended'),

        # Mac Minimum
        pl.when(
            ~(pl.col('platforms.mac')) |
            (pl.col('mac_requirements.minimum')
            .str.strip_chars()
            .str.strip_chars(":")
            .str.to_lowercase() == "minimum")
        )
        .then(pl.lit(None))
        .otherwise(pl.col('mac_requirements.minimum'))
        .alias('mac_requirements.minimum'),

        # Mac Recommended
        pl.when(
            ~(pl.col('platforms.mac')) |
            (pl.col('mac_requirements.recommended')
            .str.strip_chars()
            .str.strip_chars(":")
            .str.to_lowercase() == "recommended")
        )
        .then(pl.lit(None))
        .otherwise(pl.col('mac_requirements.recommended'))
        .alias('mac_requirements.recommended'),

        # Linux Minimum
        pl.when(
            ~(pl.col('platforms.linux')) |
            (pl.col('linux_requirements.minimum')
            .str.strip_chars()
            .str.strip_chars(":")
            .str.to_lowercase() == "minimum")
        )
        .then(pl.lit(None))
        .otherwise(pl.col('linux_requirements.minimum'))
        .alias('linux_requirements.minimum'),

        # Linux Recommended
        pl.when(
            ~(pl.col('platforms.linux')) |
            (pl.col('linux_requirements.recommended')
            .str.strip_chars()
            .str.strip_chars(":")
            .str.to_lowercase() == "recommended")
        )
        .then(pl.lit(None))
        .otherwise(pl.col('linux_requirements.recommended'))
        .alias('linux_requirements.recommended'),
    )
    .with_columns(
        # Enrich Windows
        pl.when(
            (pl.col('platforms.windows')) & 
            (
                (pl.col('pc_requirements.minimum').is_null()) &
                (pl.col('pc_requirements.recommended').is_null())
            )
        )
        .then(pl.lit(True))
        .otherwise(pl.lit(False))
        .alias('enrich.is_windows'),

        # Enrich Mac
        pl.when(
            (pl.col('platforms.mac')) & 
            (
                (pl.col('mac_requirements.minimum').is_null()) &
                (pl.col('mac_requirements.recommended').is_null())
            )
        )
        .then(pl.lit(True))
        .otherwise(pl.lit(False))
        .alias('enrich.is_mac'),

        # Enrich Linux
        pl.when(
            (pl.col('platforms.linux')) & 
            (
                (pl.col('linux_requirements.minimum').is_null()) &
                (pl.col('linux_requirements.recommended').is_null())
            )
        )
        .then(pl.lit(True))
        .otherwise(pl.lit(False))
        .alias('enrich.is_linux')
    )
)

df_cleaned_computer_specs.head().collect()

### Cleaned price

In [None]:
df_cleaned_price = (
    df_cleaned_computer_specs
    .with_columns(
        # Divide price by 100
        (pl.col('price_overview.initial') / 100).alias('price_overview.initial'),
        # Enrich Price
        pl.when(
            ~(pl.col('is_free')) & 
            (
                (pl.col('price_overview.currency').is_null()) |
                (pl.col('price_overview.initial').is_null())
            )
        )
        .then(pl.lit(True))
        .otherwise(pl.lit(False))
        .alias('enrich.is_price')
    )
)

df_cleaned_price.head().collect()

## Indexing / Sorting (merging purposes)

## Column reordering & renaming

In [None]:
df_final = (
    df_cleaned_price
    # Reorder and Rename columns
    .select(
        # Keys
        'appid', 'name', 
        pl.col('fullgame.appid').alias('game_appid'),
        # Game descriptions
        pl.col('enrich.is_description').alias('enrich_is_description'), 
        pl.col('detailed_description').alias('long_description'),
        'short_description',
        # Images / Movies
        pl.col('header_image').alias('image_header'),
        pl.col('background_raw').alias('image_background'),
        pl.col('screenshots').alias('image_screenshots'),
        pl.col('movies').alias('image_movies'),
        # Price
        pl.col('enrich.is_price').alias('enrich_is_price'),
        'is_free', 
        pl.col('price_overview.currency').alias('currency'),
        pl.col('price_overview.initial').alias('price'),
        # Meta
        'type', 'categories', 'genres', 'supported_languages', 
        pl.col('controller_support').alias('is_controller_support'),
        # Date
        pl.col('enrich.is_date').alias('enrich_is_date'),
        pl.col('release_date.date').alias('release_date'),
        # Age
        'required_age',
        # Contract
        'developers', 'publishers', 
        pl.col('content_descriptors.notes').alias('content_descriptors_notes'),
        # Progress
        pl.col('achievements.highlighted').alias('achievements'),
        # Computer Specs
        pl.col('enrich.is_windows').alias('enrich_is_windows'),
        pl.col('platforms.windows').alias('is_windows'), 
        pl.col('pc_requirements.minimum').alias('windows_requirements_minimum'),
        pl.col('pc_requirements.recommended').alias('windows_requirements_recommended'),
        pl.col('enrich.is_mac').alias('enrich_is_mac'),
        pl.col('platforms.mac').alias('is_mac'),
        pl.col('mac_requirements.minimum').alias('mac_requirements_minimum'),
        pl.col('mac_requirements.recommended').alias('mac_requirements_recommended'),
        pl.col('enrich.is_linux').alias('enrich_is_linux'),
        pl.col('platforms.linux').alias('is_linux'),
        pl.col('linux_requirements.minimum').alias('linux_requirements_minimum'),
        pl.col('linux_requirements.recommended').alias('linux_requirements_recommended'),
    )
)
df_final.head().collect()

## Test: Data Quality Check

### Check Schema

In [None]:
# 1. Check Column Structure



# 2. Check Data Types

### Check Transformations

In [None]:
# 1. Check Cleanup

# 2. Check merged required_ages / range (0, 21)

# 3. Check merged descriptions

# 4. Check cleaned dates

# 5. Check cleaned computer specs

# 6. Check cleaned price

# 7. Check renamed columns

# 8. Check ordered columns

## Load to Bronze Layer

In [None]:
import polars as pl
import json

import re

def to_pg_array_str_safe(value):
    """
    Converts a list of strings to a properly escaped PostgreSQL array literal.
    Handles quotes, commas, and special characters.
    """
    def escape_pg_string(s):
        # Escape backslashes first
        s = s.replace('\\', '\\\\')
        # Escape double quotes by doubling them
        s = s.replace('"', '\\"')  # use \\" instead of double double-quotes
        return f'"{s}"'

    return "{" + ",".join(escape_pg_string(v) for v in value) + "}"

# Convert to Python-native list of dicts
df_dicts = df_final.collect().to_dicts()

for row in df_dicts:
    for key, value in row.items():
        if isinstance(value, list) and all(isinstance(v, str) for v in value):
            row[key] = to_pg_array_str_safe(value)
        elif isinstance(value, (list, dict)):
            row[key] = json.dumps(value)

# Convert back to Polars DataFrame
df_final_cleaned = pl.DataFrame(df_dicts)

In [None]:
postgres_manager = postgres_manager.PostgresManager()
postgres_manager.load_app_details(df_final_cleaned) # with upsert logic