In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round

#load from postgres
#master = "local[*] creates as many worker threads as logical cores on your machine"
spark = SparkSession.builder.appName("Riot_Api2").config("spark.jars", "postgresql-42.2.25.jre7.jar").master(master = "local[*]").getOrCreate()

# Load from JSON

In [77]:
import pyspark.sql.functions as F
#read Matchdata from JSON source
matches_from_json = spark.read.format("json").option("inferSchema", "true").option("multiLine", "true").load("matchlist.json")

In [78]:
#alot of useless/heavily nested information
matches_from_json.printSchema()

root
 |-- info: struct (nullable = true)
 |    |-- gameCreation: long (nullable = true)
 |    |-- gameDuration: long (nullable = true)
 |    |-- gameEndTimestamp: long (nullable = true)
 |    |-- gameId: long (nullable = true)
 |    |-- gameMode: string (nullable = true)
 |    |-- gameName: string (nullable = true)
 |    |-- gameStartTimestamp: long (nullable = true)
 |    |-- gameType: string (nullable = true)
 |    |-- gameVersion: string (nullable = true)
 |    |-- mapId: long (nullable = true)
 |    |-- participants: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- assists: long (nullable = true)
 |    |    |    |-- baronKills: long (nullable = true)
 |    |    |    |-- bountyLevel: long (nullable = true)
 |    |    |    |-- challenges: struct (nullable = true)
 |    |    |    |    |-- 12AssistStreakCount: long (nullable = true)
 |    |    |    |    |-- abilityUses: long (nullable = true)
 |    |    |    |    |-- acesBefore15Minutes: 

In [80]:
#extract the wanted columns, still nested
participants_nested = matches_from_json.select(col("metadata.participants"), col("metadata.matchId"))
participants_nested.printSchema()

root
 |-- participants: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- matchId: string (nullable = true)



In [44]:
#explode participants. to have a unique matchId for further DB storage, we need to explode differently
participants_not_unique = participants_nested.select(participants_nested.matchId,F.explode(participants_nested.participants))
participants_not_unique.show()

In [62]:
participants_unique = participants_nested.select("matchId", participants_nested.participants[0], participants_nested.participants[1], participants_nested.participants[2], participants_nested.participants[3], participants_nested.participants[4], participants_nested.participants[5], participants_nested.participants[6], participants_nested.participants[7], participants_nested.participants[8], participants_nested.participants[9])
participants_unique.printSchema()

root
 |-- matchId: string (nullable = true)
 |-- participants[0]: string (nullable = true)
 |-- participants[1]: string (nullable = true)
 |-- participants[2]: string (nullable = true)
 |-- participants[3]: string (nullable = true)
 |-- participants[4]: string (nullable = true)
 |-- participants[5]: string (nullable = true)
 |-- participants[6]: string (nullable = true)
 |-- participants[7]: string (nullable = true)
 |-- participants[8]: string (nullable = true)
 |-- participants[9]: string (nullable = true)



In [90]:
#extract the player Details plus the matchId
player_details = matches_from_json.select(col("info.participants"), col("metadata.matchId"))
#the details are in a struct in an array so we need to apply both array flattening and struct flattening
player_details = player_details.select(player_details.matchId, F.explode(player_details.participants))
player_details = player_details.select(col("matchId"), col("col.*"))
#finally we drop the unwanted data
player_details = player_details.drop("challenges", "perks")
#our df is ready to be used in our database
player_details.printSchema()

root
 |-- matchId: string (nullable = true)
 |-- assists: long (nullable = true)
 |-- baronKills: long (nullable = true)
 |-- bountyLevel: long (nullable = true)
 |-- champExperience: long (nullable = true)
 |-- champLevel: long (nullable = true)
 |-- championId: long (nullable = true)
 |-- championName: string (nullable = true)
 |-- championTransform: long (nullable = true)
 |-- consumablesPurchased: long (nullable = true)
 |-- damageDealtToBuildings: long (nullable = true)
 |-- damageDealtToObjectives: long (nullable = true)
 |-- damageDealtToTurrets: long (nullable = true)
 |-- damageSelfMitigated: long (nullable = true)
 |-- deaths: long (nullable = true)
 |-- detectorWardsPlaced: long (nullable = true)
 |-- doubleKills: long (nullable = true)
 |-- dragonKills: long (nullable = true)
 |-- firstBloodAssist: boolean (nullable = true)
 |-- firstBloodKill: boolean (nullable = true)
 |-- firstTowerAssist: boolean (nullable = true)
 |-- firstTowerKill: boolean (nullable = true)
 |-- game

# Load from DB

In [0]:
#get the matches table
matches = spark.read.format("jdbc").options(
    url='jdbc:postgresql://localhost:5432/RIOT2', # jdbc:postgresql://<host>:<port>/<database>
    dbtable='matches',
    user='postgres',
    password='1234',
    driver='org.postgresql.Driver').load()
matches.printSchema()

In [2]:
matches.groupBy("gameMode").count().show()

+-----------------+-----+
|         gameMode|count|
+-----------------+-----+
|TUTORIAL_MODULE_3|    7|
|             ARAM|  122|
|TUTORIAL_MODULE_2|    3|
|          CLASSIC|  428|
|TUTORIAL_MODULE_1|    7|
+-----------------+-----+



In [44]:
#filter out tutorial games
matches.createOrReplaceTempView("matches")

matches = spark.sql("SELECT * FROM matches WHERE gamemode == 'ARAM' OR gamemode == 'CLASSIC';")
matches.groupBy("gameMode").count().show()

+--------+-----+
|gameMode|count|
+--------+-----+
|    ARAM|  122|
| CLASSIC|  428|
+--------+-----+



In [4]:
matches.take(1)

[Row(gameCreation=1646439824000, gameDuration=1492, gameEndTimestamp=1646441376127, gameId=5762695498, gameMode='ARAM', gameName='teambuilder-match-5762695498', gameStartTimestamp=1646439884149, gameType='MATCHED_GAME', gameVersion='12.5.425.9171', mapId=12, platformId='EUW1', queueId=450, tournamentCode='', matchId='EUW_5762695498')]

In [48]:
#Making Timestamps readable. We have to divide by 1000 and round because the server takes millisecond timestamps
matches = matches.withColumn('gameStartTimestamp', col('gameStartTimestamp')/1000)
matches = matches.withColumn('gameStartTimestamp', round('gameStartTimestamp'))
matches = matches.withColumn('gameStartTimestamp', col('gameStartTimestamp').cast("timestamp"))
matches = matches.withColumn('gameCreation', col('gameCreation')/1000)
matches = matches.withColumn('gameCreation', round('gameCreation'))
matches = matches.withColumn('gameCreation', col('gameCreation').cast("timestamp"))
matches = matches.withColumn('gameEndTimestamp', col('gameEndTimestamp')/1000)
matches = matches.withColumn('gameEndTimestamp', round('gameEndTimestamp'))
matches = matches.withColumn('gameEndTimestamp', col('gameEndTimestamp').cast("timestamp"))


In [51]:
matches.select('gameStartTimestamp', 'gameEndTimestamp').sort('gameStartTimestamp' ,ascending=True).show(600)

+-------------------+-------------------+
| gameStartTimestamp|   gameEndTimestamp|
+-------------------+-------------------+
|2022-03-05 01:14:59|2022-03-05 01:34:31|
|2022-03-05 01:15:23|2022-03-05 01:38:34|
|2022-03-05 01:15:26|2022-03-05 01:33:10|
|2022-03-05 01:15:29|2022-03-05 01:30:26|
|2022-03-05 01:15:31|2022-03-05 01:38:14|
|2022-03-05 01:15:35|2022-03-05 01:41:22|
|2022-03-05 01:15:35|2022-03-05 01:31:27|
|2022-03-05 01:15:37|2022-03-05 01:38:27|
|2022-03-05 01:15:39|2022-03-05 01:48:45|
|2022-03-05 01:15:40|2022-03-05 01:35:30|
|2022-03-05 01:15:46|2022-03-05 01:40:27|
|2022-03-05 01:15:51|2022-03-05 01:45:59|
|2022-03-05 01:15:52|2022-03-05 01:37:34|
|2022-03-05 01:15:54|2022-03-05 01:39:05|
|2022-03-05 01:16:03|2022-03-05 01:31:37|
|2022-03-05 01:16:08|2022-03-05 01:39:51|
|2022-03-05 01:16:08|2022-03-05 01:31:10|
|2022-03-05 01:16:09|2022-03-05 01:36:01|
|2022-03-05 01:16:10|2022-03-05 01:41:17|
|2022-03-05 01:16:18|2022-03-05 01:47:34|
|2022-03-05 01:16:28|2022-03-05 01

In [64]:
from pyspark.sql.functions import concat, lit, floor
#Translate gameDuration(seconds) into readable Game Time (mm:ss)
matches2 = matches.withColumn('gameTime', concat(floor(col('gameDuration')/60), lit(':'),col('gameDuration')%60))

In [65]:
matches2.select('gameTime').show()

+--------+
|gameTime|
+--------+
|   24:52|
|    29:0|
|    19:3|
|   18:20|
|   26:57|
|   30:14|
|   36:15|
|   14:45|
|    25:4|
|    28:0|
|   40:35|
|    37:4|
|   27:36|
|   20:44|
|   20:14|
|   25:15|
|   29:54|
|   33:40|
|    28:9|
|   36:13|
+--------+
only showing top 20 rows

