In [142]:
# jupyter server url : http://127.0.0.1:8889/?token=feee72ecb0de9269728ba7e915647a4e8f50dd477058e9e1

In [143]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PierreMihEval2") \
    .config("spark.ui.port","4040")\
    .config("spark.executor.memory","10g")\
    .config("spark.driver.memory","4g")\
    .config("spark.jars", "/opt/spark/jars/mysql-connector-j-9.0.0.jar")\
    .getOrCreate()

In [144]:
user = "thegameadministrator"
password = "thegameadministratorpassword"
driver = "com.mysql.jdbc.Driver"
schemaName = "thegame"
properties = {"user":user,
              "driver":driver,
              "password":password
              }
connectionString = "jdbc:mysql://db"
players_table_name = "players"
player_matches_table_name = "player_matches"
player_message_sentiment_table_name = "player_messages"
player_transactions_table_name = "player_transactions"
transaction_products_table_name = "transaction_products"
anomalies_table_name = "anomalies"

In [145]:
df_players = spark.read.jdbc(connectionString, schemaName + "." + players_table_name, properties=properties)
df_player_matches = spark.read.jdbc(connectionString, schemaName + "." + player_matches_table_name, properties=properties)
df_player_messages_sentiment = spark.read.jdbc(connectionString, schemaName + "." + player_message_sentiment_table_name, properties=properties)
df_player_transactions = spark.read.jdbc(connectionString, schemaName + "." + player_transactions_table_name, properties=properties)
df_transaction_products = spark.read.jdbc(connectionString, schemaName + "." + transaction_products_table_name, properties=properties)

In [146]:
df_players.show(1)
df_player_matches.show(1)
df_player_messages_sentiment.show(1)
df_player_transactions.show(1)
df_transaction_products.show(1)

+----+-----+------------------+
|  id|level|total_hours_played|
+----+-----+------------------+
|4761|   13|               390|
+----+-----+------------------+
only showing top 1 row

+---------+--------------------+-----+------+--------------+-----------------------+-----------+--------------+
|player_id|                date|kills|deaths|friendly_kills|time_in_movement_ration|shots_fired|flags_captured|
+---------+--------------------+-----+------+--------------+-----------------------+-----------+--------------+
|     4761|2024-07-13Z22:56:01Z|    0|     1|             1|                   0.13|        288|             0|
+---------+--------------------+-----+------+--------------+-----------------------+-----------+--------------+
only showing top 1 row

+----+--------------------+---------+
|  id|                date|sentiment|
+----+--------------------+---------+
|2862|2023-10-17Z04:47:53Z|     0.63|
+----+--------------------+---------+
only showing top 1 row

+-------+---------

In [147]:
from pyspark.sql.functions import col, concat

df_players_plus = df_players.withColumn("hours_per_level", col("total_hours_played") / col("level"))
df_players_plus.select("level", "total_hours_played", "hours_per_level").describe().show()
level_to_check = 5
df_players_plus.where(col("level") > level_to_check).orderBy("total_hours_played", ascending=True).show()

+-------+-----------------+------------------+------------------+
|summary|            level|total_hours_played|   hours_per_level|
+-------+-----------------+------------------+------------------+
|  count|             5000|              5000|              5000|
|   mean|          10.4732|           278.978|           26.7328|
| stddev|5.754954879156946|223.97090460419577|13.726860147094794|
|    min|                1|                 3|               3.0|
|    max|               20|              1000|              50.0|
+-------+-----------------+------------------+------------------+

+----+-----+------------------+---------------+
|  id|level|total_hours_played|hours_per_level|
+----+-----+------------------+---------------+
|4064|    6|                18|            3.0|
|  61|    6|                18|            3.0|
|3053|    6|                18|            3.0|
|1653|    7|                21|            3.0|
|2351|    7|                21|            3.0|
|2356|    7|         

In [148]:
from pyspark.sql.types import StringType, StructType, StructField, IntegerType


AnomalieSchema = StructType([
    StructField("type", IntegerType(), False),
    StructField("description", StringType(), False)
    ]
)

df_anomalies = spark.createDataFrame(data=[], schema=AnomalieSchema)
df_anomalies.show()

+----+-----------+
|type|description|
+----+-----------+
+----+-----------+



In [149]:
df_players_plus_plus = df_players_plus.join(
    df_player_matches.groupby("player_id").count().select("player_id", "count"), 
    df_players.id == df_player_matches.player_id, how="left")\
    .select("id","level","total_hours_played","hours_per_level", col("count").alias("game_count"))\
    .withColumn("games_per_level", col("game_count") / col("level"))
df_players_plus_plus.orderBy("games_per_level",ascending=True).show()

+----+-----+------------------+---------------+----------+---------------+
|  id|level|total_hours_played|hours_per_level|game_count|games_per_level|
+----+-----+------------------+---------------+----------+---------------+
|3662|   10|               390|           39.0|        10|            1.0|
|1776|    8|               216|           27.0|         8|            1.0|
|2473|   12|               564|           47.0|        12|            1.0|
|1547|    1|                35|           35.0|         1|            1.0|
| 932|    1|                21|           21.0|         1|            1.0|
|4848|    1|                25|           25.0|         1|            1.0|
|1979|    5|                25|            5.0|         5|            1.0|
| 965|    3|               108|           36.0|         3|            1.0|
|2241|    5|                50|           10.0|         5|            1.0|
|1145|    7|               336|           48.0|         7|            1.0|
|1353|    5|             

In [150]:
df_players_plus_plus_2 = df_players_plus_plus.union(spark.createDataFrame(data=[(9999, 20, 100, float(5), 2, float(2//20))], schema=df_players_plus_plus.schema))

In [151]:
from pyspark.sql.functions import lit, concat

games_per_level_anomaly_type = 1
df_anomalies_1 = df_anomalies.union(
    df_players_plus_plus_2\
        .where(col("games_per_level") < 1)\
        .withColumn("anomaly_description", concat(lit("Le joueur ") , col("id") , lit(" est niveau ") , col("level") , lit(" en moins de ") , col("level") , lit(" parties (") , col("game_count"), lit(").")))\
        .withColumn("anomaly_type", lit(games_per_level_anomaly_type))\
        .select("anomaly_type","anomaly_description")
        #.show(truncate=False)
)
df_anomalies_1.show(truncate=False)

+----+--------------------------------------------------------+
|type|description                                             |
+----+--------------------------------------------------------+
|1   |Le joueur 9999 est niveau 20 en moins de 20 parties (2).|
+----+--------------------------------------------------------+



In [152]:
df_anomalies_1.write\
    .format("jdbc")\
    .option("driver", driver)\
    .option("url", connectionString + "/" + schemaName)\
    .option("dbtable", anomalies_table_name)\
    .option("user", user)\
    .option("password", password)\
    .mode("overwrite")\
    .save()