In [None]:
# Databricks notebook source

Projet Steam / Ubisoft ‚Äì setup_and_data_loading

 **Objectif de ce notebook**

 - Lire le fichier JSON semi-structur√© contenant les jeux Steam depuis S3.
 - Comprendre la structure (sch√©ma, champs imbriqu√©s).
 - Construire un DataFrame nettoy√© et enrichi (`steam_games_clean`).
 - Sauvegarder ce DataFrame dans une table Databricks pour les analyses suivantes.

# COMMAND ----------


 ## 1. Imports & param√®tres

 Dans cette section, on importe les fonctions PySpark n√©cessaires
 et on d√©finit quelques param√®tres (chemin S3, nom de la table de sortie).

# COMMAND ----------

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Chemin du fichier JSON brut sur S3
raw_path = "s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json"

# Nom de la table nettoy√©e que l'on va cr√©er
clean_table_name = "default.steam_games_clean"

# COMMAND ----------


 ## 2. Lecture du JSON brut

 On lit le fichier JSON depuis S3.
 Comme la structure est semi-structur√©e, on laisse Spark inf√©rer le sch√©ma.
 Ensuite, on affiche le sch√©ma et quelques lignes pour se faire une id√©e des champs disponibles.

# COMMAND ----------

df_raw = (
    spark.read
    .option("multiLine", True)  # au cas o√π chaque enregistrement est sur plusieurs lignes
    .json(raw_path)
)

print("Nombre de lignes brutes :", df_raw.count())
df_raw.printSchema()

# Affiche quelques exemples pour inspection visuelle
display(df_raw.limit(20))

# COMMAND ----------


 ## 3. Comprendre la structure

 Dans ce projet, on va supposer que le JSON contient pour chaque jeu :

 - un identifiant unique (`app_id` ou similaire),
 - un nom de jeu (`name`),
 - des informations de date de sortie,
 - des informations de prix,
 - des langues support√©es,
 - des genres,
 - des plateformes (Windows / Mac / Linux),
 - des m√©triques de review (positives/n√©gatives),
 - des restrictions d'√¢ge.

 > üí° **Important** : Si les noms de colonnes dans ton JSON r√©el sont l√©g√®rement
 > diff√©rents (par exemple `appid` au lieu de `app_id`, ou `platforms.windows`
 > au lieu de `platform_windows`), il suffira d'adapter les `col(...)` au bon nom.

# COMMAND ----------


 ## 4. Construction d'un DataFrame "jeux" avec les colonnes utiles

 On cr√©e un DataFrame `df_games` avec un ensemble de colonnes standardis√©es.

 On suppose les champs suivants (√† adapter si besoin) :

 - `app_id` : identifiant du jeu
 - `name` : nom du jeu
 - `publisher` : √©diteur (string)
 - `developer` : d√©veloppeur (string)
 - `release_date` : date de sortie (string ou date)
 - `price` : prix du jeu en devise principale de Steam (float)
 - `discount_percent` : pourcentage de r√©duction (int/float)
 - `is_free` : bool√©en (gratuit ou non)
 - `supported_languages` : texte ou liste de langues
 - `genres` : liste de genres
 - `platforms` : structure avec des bool√©ens `windows`, `mac`, `linux`
 - `positive_reviews`, `negative_reviews` : nombre d'avis
 - `required_age` : √¢ge minimum recommand√©
 - `metacritic_score` : score m√©tacritic (si disponible)

# COMMAND ----------

from pyspark.sql.functions import col

# Exemple g√©n√©rique : on essaie de lire des colonnes "plates" si elles existent.
# Si ton JSON a une structure imbriqu√©e (par ex. data.name, data.platforms.windows),
# adapte ici en utilisant col("data.name").alias("name"), etc.

df_games = df_raw.select(
    col("app_id").alias("app_id"),
    col("name").alias("name"),
    col("publisher").alias("publisher"),
    col("developer").alias("developer"),
    col("release_date").alias("release_date_raw"),
    col("price").alias("price_raw"),
    col("discount_percent").alias("discount_percent"),
    col("is_free").alias("is_free"),
    col("supported_languages").alias("supported_languages_raw"),
    col("genres").alias("genres_raw"),
    col("platforms").alias("platforms_raw"),
    col("positive_reviews").alias("positive_reviews"),
    col("negative_reviews").alias("negative_reviews"),
    col("required_age").alias("required_age"),
    col("metacritic_score").alias("metacritic_score")
)

display(df_games.limit(20))

# COMMAND ----------


 ## 5. Nettoyage & normalisation des colonnes

 ### 5.1 Dates de sortie

 On :
 - convertit la date brute en vrai type `date`,
 - d√©rive l'ann√©e et le mois (utile pour les analyses temporelles).

 ### 5.2 Prix et discount

 On :
 - convertit `price_raw` en float (si c'est une string),
 - remplace les valeurs nulles par 0 si le jeu est gratuit (`is_free = true`),
 - cr√©e un indicateur `has_discount`.

 ### 5.3 Langues

 On :
 - transforme le champ brut en tableau de langues (`languages_array`),
 - compte le nombre de langues par jeu (`nb_languages`).

 ### 5.4 Reviews & score

 On :
 - calcule le nombre total d'avis (`total_reviews`),
 - calcule le ratio d'avis positifs (`positive_ratio`).

 ### 5.5 Genres & plateformes

 - `genres_array` : on s'assure que c'est bien un tableau de strings.
 - `platform_windows`, `platform_mac`, `platform_linux` : bool√©ens d√©riv√©s de la structure `platforms_raw` si elle existe.

# COMMAND ----------

# 5.1 Date de sortie
df_games = df_games.withColumn(
    "release_date",
    F.to_date("release_date_raw")  # si format diff√©rent, ajouter un format explicite
)

df_games = (
    df_games
    .withColumn("release_year", F.year("release_date"))
    .withColumn("release_month", F.month("release_date"))
)

# COMMAND ----------

# 5.2 Prix et discount
# Conversion prix en float
df_games = df_games.withColumn(
    "price",
    F.when(F.col("price_raw").cast("double").isNotNull(), F.col("price_raw").cast("double"))
     .otherwise(F.lit(0.0))
)

# Si is_free est true, on force le prix √† 0
df_games = df_games.withColumn(
    "price",
    F.when(F.col("is_free") == True, F.lit(0.0)).otherwise(F.col("price"))
)

# Indicateur de r√©duction
df_games = df_games.withColumn(
    "discount_percent",
    F.col("discount_percent").cast("double")
)

df_games = df_games.withColumn(
    "has_discount",
    F.when((F.col("discount_percent").isNotNull()) & (F.col("discount_percent") > 0), F.lit(True))
     .otherwise(F.lit(False))
)

# COMMAND ----------

# 5.3 Langues ‚Äì on transforme un champ texte en tableau de langues
# Hypoth√®se : supported_languages_raw est une string du type "English, French, Italian"
# Si c'est d√©j√† un array, il suffit de renommer.

df_games = df_games.withColumn(
    "languages_array",
    F.when(
        F.col("supported_languages_raw").isNotNull(),
        F.split(F.col("supported_languages_raw"), ",\\s*")
    ).otherwise(F.array().cast("array<string>"))
)

df_games = df_games.withColumn(
    "nb_languages",
    F.size("languages_array")
)

# COMMAND ----------

# 5.4 Reviews & score
df_games = (
    df_games
    .withColumn("positive_reviews", F.col("positive_reviews").cast("long"))
    .withColumn("negative_reviews", F.col("negative_reviews").cast("long"))
)

df_games = df_games.withColumn(
    "total_reviews",
    (F.col("positive_reviews").cast("long") + F.col("negative_reviews").cast("long"))
)

df_games = df_games.withColumn(
    "positive_ratio",
    F.when(F.col("total_reviews") > 0,
           F.col("positive_reviews") / F.col("total_reviews"))
     .otherwise(F.lit(None).cast("double"))
)

# COMMAND ----------

# 5.5 Genres ‚Äì on s'assure d'avoir un array de strings
# Si genres_raw est d√©j√† un array<string>, on le renomme simplement.

df_games = df_games.withColumn(
    "genres_array",
    F.when(
        F.col("genres_raw").isNotNull() & F.col("genres_raw").cast("array<string>").isNotNull(),
        F.col("genres_raw").cast("array<string>")
    ).otherwise(F.array().cast("array<string>"))
)

# 5.6 Plateformes ‚Äì on d√©rive trois bool√©ens windows/mac/linux
# Hypoth√®se : platforms_raw est une struct avec des bool√©ens .windows, .mac, .linux.
# Si tu as d√©j√† trois colonnes s√©par√©es, adapte ici.

df_games = df_games.withColumn(
    "platform_windows",
    F.col("platforms_raw.windows").cast("boolean")
)

df_games = df_games.withColumn(
    "platform_mac",
    F.col("platforms_raw.mac").cast("boolean")
)

df_games = df_games.withColumn(
    "platform_linux",
    F.col("platforms_raw.linux").cast("boolean")
)

# COMMAND ----------


 ## 6. Derniers ajustements & aper√ßu du DataFrame nettoy√©

 On :
 - garde les colonnes finales utiles,
 - v√©rifie quelques lignes,
 - sauvegarde dans une table Databricks.

# COMMAND ----------

df_clean = df_games.select(
    "app_id",
    "name",
    "publisher",
    "developer",
    "release_date",
    "release_year",
    "release_month",
    "price",
    "discount_percent",
    "has_discount",
    "is_free",
    "languages_array",
    "nb_languages",
    "genres_array",
    "platform_windows",
    "platform_mac",
    "platform_linux",
    "positive_reviews",
    "negative_reviews",
    "total_reviews",
    "positive_ratio",
    "required_age",
    "metacritic_score"
)

display(df_clean.limit(20))

print("Nombre de jeux apr√®s nettoyage :", df_clean.count())

# COMMAND ----------


 ## 7. Sauvegarde dans une table Databricks

 Cette table sera la base de toutes les analyses suivantes
 (macro, genres, plateformes).

 On utilise un format g√©r√© par Databricks (Delta ou Parquet via `saveAsTable`).

# COMMAND ----------

(
    df_clean
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(clean_table_name)
)

print(f"Table cr√©√©e : {clean_table_name}")

# V√©rification rapide
df_check = spark.table(clean_table_name)
print("Nombre de lignes dans la table :", df_check.count())
display(df_check.limit(10))