In [0]:
# If you wish to see what's inside the spark object and the sparkContext run these commands
# but your code will work regardless since they have already been set up.
spark
sc = spark.sparkContext

In [0]:
from pyspark.sql import functions as F # This will load the class where spark sql functions are contained
from pyspark.sql import Row # this will let us manipulate rows with spark sql
from pyspark.sql.types import * # Import types to convert columns using spark sql
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
from pyspark.sql.functions import count
from pyspark.sql.functions import to_date, to_timestamp

In [0]:
# Créez un DataFrame à partir des données
steam = spark.read.option("multiline", "true").json("s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json")

In [0]:
steam.show()
steam.printSchema()

+--------------------+-------+
|                data|     id|
+--------------------+-------+
|{10, [Multi-playe...|     10|
|{1000000, [Single...|1000000|
|{1000010, [Single...|1000010|
|{1000030, [Multi-...|1000030|
|{1000040, [Single...|1000040|
|{1000080, [Multi-...|1000080|
|{1000100, [Single...|1000100|
|{1000110, [Multi-...|1000110|
|{1000130, [Single...|1000130|
|{1000280, [Single...|1000280|
|{1000310, [Multi-...|1000310|
|{1000360, [Multi-...|1000360|
|{1000370, [Single...|1000370|
|{1000380, [Single...|1000380|
|{1000410, [Single...|1000410|
|{1000470, [Single...|1000470|
|{1000480, [Single...|1000480|
|{1000500, [Multi-...|1000500|
|{1000510, [], 0, ...|1000510|
|{1000540, [Multi-...|1000540|
+--------------------+-------+
only showing top 20 rows

root
 |-- data: struct (nullable = true)
 |    |-- appid: long (nullable = true)
 |    |-- categories: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- ccu: long (nullable = true)
 |    |-- de

In [0]:

# Liste des colonnes à extraire
columns_to_extract = ['genre', 'publisher', 'platforms','positive','negative','categories','name','developer','discount','initialprice','ccu','release_date']

# Boucle à travers chaque colonne et appliquez getField() pour extraire la valeur
for column in columns_to_extract:
    steam = steam.withColumn(column, col("data").getField(column))

In [0]:
# Supprimez les colonnes non nécessaires
columns_to_drop = ['data', 'id']  # Ajoutez les colonnes à supprimer ici
steam = steam.drop(*columns_to_drop)

# Affichez le DataFrame résultant
steam.show()

+--------------------+----------------------------+--------------------+--------+--------+--------------------+------------------------------------+----------------------------+--------+------------+-----+------------+
|               genre|                   publisher|           platforms|positive|negative|          categories|                                name|                   developer|discount|initialprice|  ccu|release_date|
+--------------------+----------------------------+--------------------+--------+--------+--------------------+------------------------------------+----------------------------+--------+------------+-----+------------+
|              Action|                       Valve|  {true, true, true}|  201215|    5199|[Multi-player, Va...|                      Counter-Strike|                       Valve|       0|         999|13990|   2000/11/1|
|Action, Adventure...|        PsychoFlux Entert...|{false, false, true}|      27|       5|[Single-player, P...|             

In [0]:
# Utilisez groupBy() pour grouper par une colonne et appliquer une opération d'agrégation
result = steam.groupBy("publisher", "developer").agg(count("*").alias("count1"), count("*").alias("count2"))

# Triez les résultats par ordre décroissant
result = result.orderBy("count1", ascending=False)

# Affichez le résultat
result.show()

+--------------------+--------------------+------+------+
|           publisher|           developer|count1|count2|
+--------------------+--------------------+------+------+
|     Choice of Games|     Choice of Games|   140|   140|
|        Laush Studio|Laush Dmitriy Ser...|   105|   105|
|   Sokpop Collective|   Sokpop Collective|    98|    98|
|              8floor|             Creobit|    94|    94|
|      Reforged Group|      Reforged Group|    89|    89|
|KOEI TECMO GAMES ...|KOEI TECMO GAMES ...|    86|    86|
|        Hosted Games|        Hosted Games|    79|    79|
|  Boogygames Studios|  Boogygames Studios|    78|    78|
|      Big Fish Games|      Elephant Games|    73|    73|
|       Blender Games|       Blender Games|    70|    70|
|                SEGA|                SEGA|    64|    64|
| MAGIX Software GmbH| MAGIX Software GmbH|    61|    61|
|              8floor|         Somer Games|    59|    59|
|      Big Fish Games|    AMAX Interactive|    58|    58|
|     Ripknot 

In [0]:
#####  Quel éditeur a sorti le plus de jeux sur Steam ?

# Utilisez groupBy() pour grouper par une colonne et appliquer une opération d'agrégation
result = steam.groupBy("publisher").agg(count("*").alias("count"))

# Triez les résultats par ordre décroissant
result = result.orderBy("count", ascending=False)

# Affichez le résultat
result.show()


+--------------------+-----+
|           publisher|count|
+--------------------+-----+
|      Big Fish Games|  422|
|              8floor|  202|
|                SEGA|  165|
|      Strategy First|  151|
|         Square Enix|  141|
|     Choice of Games|  140|
|       Sekai Project|  132|
|            HH-Games|  132|
|                    |  132|
|             Ubisoft|  127|
|        Laush Studio|  126|
|          THQ Nordic|  125|
|Alawar Entertainment|  107|
|  Fulqrum Publishing|  104|
|     Plug In Digital|  101|
|            Ziggurat|  100|
|     Slitherine Ltd.|   99|
|   Sokpop Collective|   99|
|    Devolver Digital|   98|
|KOEI TECMO GAMES ...|   90|
+--------------------+-----+
only showing top 20 rows



In [0]:
######  Quels sont les jeux les mieux notés ?

# Utilisez groupBy() pour grouper par une colonne et appliquer une opération d'agrégation
result1 = steam.groupBy("name", "positive").agg(count("*"))

# Triez les résultats par ordre décroissant
result1 = result1.orderBy("positive", ascending=False)

# Affichez le résultat
result1.show()

+--------------------+--------+--------+
|                name|positive|count(1)|
+--------------------+--------+--------+
|Counter-Strike: G...| 5943345|       1|
|              Dota 2| 1534895|       1|
|  Grand Theft Auto V| 1229265|       1|
| PUBG: BATTLEGROUNDS| 1185361|       1|
|            Terraria| 1014711|       1|
|Tom Clancy's Rain...|  942910|       1|
|         Garry's Mod|  861240|       1|
|     Team Fortress 2|  846407|       1|
|                Rust|  732513|       1|
|       Left 4 Dead 2|  643836|       1|
|The Witcher 3: Wi...|  632627|       1|
|            Among Us|  586302|       1|
|Euro Truck Simula...|  572368|       1|
|    Wallpaper Engine|  561096|       1|
|            PAYDAY 2|  532013|       1|
|    Dead by Daylight|  509637|       1|
|      Stardew Valley|  497558|       1|
|       Rocket League|  496499|       1|
|          ELDEN RING|  490203|       1|
|ARK: Survival Evo...|  481318|       1|
+--------------------+--------+--------+
only showing top

In [0]:
###### Y a-t-il des années avec plus de sorties ? Y a-t-il eu plus ou moins de sorties de jeux pendant le Covid par exemple ?

# Convertir une colonne de chaînes de caractères en DateType
df = steam.withColumn("release_date", to_date(steam["string_date_column"], "yyyy-MM-dd"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2185902050958815>:4[0m
[1;32m      1[0m [38;5;66;03m###### Y a-t-il des années avec plus de sorties ? Y a-t-il eu plus ou moins de sorties de jeux pendant le Covid par exemple ?[39;00m
[1;32m      2[0m 
[1;32m      3[0m [38;5;66;03m# Convertir une colonne de chaînes de caractères en DateType[39;00m
[0;32m----> 4[0m df [38;5;241m=[39m steam[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mrelease_date[39m[38;5;124m"[39m, to_date([43msteam[49m[43m[[49m[38;5;124;43m"[39;49m[38;5;124;43mstring_date_column[39;49m[38;5;124;43m"[39;49m[43m][49m, [38;5;124m"[39m[38;5;124myyyy-MM-dd[39m[38;5;124m"[39m))

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     

In [0]:
filepath = "s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json"
steam = (spark.read.format('json')\
             .option('header', 'true')\
             .option('inferSchema', 'true')\
             .load(filepath))

In [0]:
type(steam)
steam.show()
steam.printSchema()

In [0]:
steam \
    .withColumn('new_platforms', F.col('data').getField('languages')) \
    .show()

# F.col("col_name") returns the column object just like df.col_name or df["col_name"]

In [0]:
steam \
    .withColumn('order_id', F.col('data.platforms')) \
    .show()

In [0]:
# Let's extract both the nested columns to get a flat schema
orders_df_flattened = steam \
    #.withColumn('New_categories', F.col('data.categories')) \
    #.withColumn('New_developer', F.col('data.developer')) \
    .withColumn('Newgenre', F.col('data.genre')) \ 
    .withColumn('New_owners', F.col('data.owners')) \           
    #.drop('orders')
orders_df_flattened.show()

In [0]:
steam.select(steam['name'], steam['owners'] + 1).show()

In [0]:
steam \
    .withColumn('New_genre', F.col('data').getField('genre')) \        
    .show()

# F.col("col_name") returns the column object just like df.col_name or df["col_name"]

In [0]:
filepath = "s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json"

steam = (spark.read.format('json')\
             .option('header', 'true')\
             .option('inferSchema', 'true')\
             .load(filepath))

In [0]:
len(steam.columns)

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def value_counts(steam, col_name):
  return steam.select(col_name) \
           .groupBy(col_name) \
           .count() \
           .orderBy(F.desc('count'))

In [0]:
for col_name in steam.columns:
  value_counts(steam, col_name).show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, BooleanType, StringType, ArrayType

# Création d'une session Spark
spark = SparkSession.builder \
    .appName("CreateDataFrame") \
    .getOrCreate()

# Schéma de la structure de données
schema = StructType([
    StructField("data", StructType([
        StructField("appid", LongType(), nullable=True),
        StructField("categories", ArrayType(StringType()), nullable=True),
        StructField("ccu", LongType(), nullable=True),
        StructField("développeur", StringType(), nullable=True),
        StructField("remise", StringType(), nullable=True),
        StructField("genre", StringType(), nullable=True),
        StructField("header_image", StringType(), nullable=True),
        StructField("prix_initial", StringType(), nullable=True),
        StructField("langues", StringType(), nullable=True),
        StructField("nom", StringType(), nullable=True),
        StructField("négatif", LongType(), nullable=True),
        StructField("propriétaires", StringType(), nullable=True),
        StructField("plateformes", StructType([
            StructField("linux", BooleanType(), nullable=True),
            StructField("mac", BooleanType(), nullable=True),
            StructField("windows", BooleanType(), nullable=True),
        ]), nullable=True),
        StructField("positif", LongType(), nullable=True),
        StructField("prix", StringType(), nullable=True),
        StructField("éditeur", StringType(), nullable=True),
        StructField("release_date", StringType(), nullable=True),
        StructField("require_age", StringType(), nullable=True),
        StructField("short_description", StringType(), nullable=True),
        StructField("tags", StructType([
            StructField("Années 1980", LongType(), nullable=True),
            # Ajoutez ici les autres champs de tags en suivant le même format
        ]), nullable=True),
        StructField("type", StringType(), nullable=True),
        StructField("site_web", StringType(), nullable=True),
    ])),
    StructField("id", StringType(), nullable=True),
])

# Création du DataFrame à partir du schéma
df = spark.createDataFrame([], schema)

# Affichage du schéma du DataFrame
df.printSchema()

# Affichage du DataFrame
df.show()


In [0]:
# Sélection de colonnes spécifiques
 selected_df = df.select("data")

In [0]:
user_logs.select("data.categories","data.genre").show(truncate=False)

In [0]:
user_logs.select("discount.*").show(truncate=False)

In [0]:
user_logs.printSchema()
user_logs.show(truncate=False)

In [0]:
user_logs.groupBy("tags").sum("data").show(truncate=False)

In [0]:
user_logs.describe()

In [0]:
user_describe = user_logs.describe()
user_describe.toPandas()

In [0]:
display(user_describe)

In [0]:
user_describe.show()

In [0]:
user_logs.createOrReplaceTempView('user_logs_table')

In [0]:
display(spark.sql("""SELECT COUNT(*) 
                      FROM user_logs_table"""))

In [0]:
user_logs.count()

In [0]:
spark.sql("""SELECT data FROM user_logs_table""").show()

In [0]:
user_logs.select('data').show()

In [0]:
spark.sql("""SELECT DISTINCT(data) FROM user_logs_table""").show()

In [0]:
user_logs.select('data').distinct().show()

In [0]:
display(spark.sql("""SELECT DISTINCT(data) AS distinct_user FROM user_logs_table"""))

In [0]:
user_logs.select(user_logs['data'].alias('distinct_user')).distinct().show()
#user_logs.select('data').distinct().withColumnRenamed('user','distinct_user').show() # alternative solution

In [0]:
spark.sql("""
    SELECT COUNT(DISTINCT(data)) AS total_distinct_user
    FROM user_logs_table""").show()

In [0]:
user_logs.select('data').distinct().count()

In [0]:
# Utilisation de la méthode getField() pour extraire le champ 'single'
result = user_logs.select(user_logs["data"].getField("tags").alias("tags_value"))

# Afficher le résultat
result.show()

In [0]:
display(result)

In [0]:
type(result)

result.show()

In [0]:
result.describe().toPandas()

In [0]:
# Créer un DataFrame vide avec le schéma spécifié
df = spark.createDataFrame([], result)

# Afficher le schéma du DataFrame
df.printSchema()

In [0]:
display(user_logs)

In [0]:
user_logs.collect()

In [0]:
user_logs.take(3)

In [0]:
user_logs.count()

In [0]:
user_logs.show()

In [0]:
user_logs.printSchema()

In [0]:
user_logs.na

In [0]:
user_logs.toPandas()

In [0]:
user_logs.describe()

In [0]:
user_logs.display()

In [0]:
user_logs.createOrReplaceTempView('user_logs_table')

In [0]:
display(spark.sql("""SELECT COUNT(*) 
                      FROM user_logs_table"""))

In [0]:
user_logs.count()