In [195]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import json
import pandas as pd

In [196]:
spark = spark = SparkSession.builder \
    .appName("pyspark_test") \
    .getOrCreate()

In [197]:
file_path = '../data/Team_Liquid_vs_Sentinels.json'
with open(file_path, 'r') as f:
    data = json.load(f)

### get match event data

In [198]:
# Function that gets event and date stats for a match
def get_match_event_stats():
    match_name = list(data.keys())[0]
    print(match_name)

    team1, team2 = match_name.split('_vs_')
    team1_name = team1.replace('_', ' ')
    team2_name = team2.replace('_', ' ')


    event_stats = data[match_name][0]
    event_stats

    event_stats_df_pd = pd.DataFrame(data[list(data.keys())[0]][0], index=[0])

    return spark.createDataFrame(event_stats_df_pd), match_name, team1_name, team2_name
    
event_stats_df, match_name, team1_name, team2_name = get_match_event_stats()
event_stats_df.show()



Team_Liquid_vs_Sentinels
+--------------------+----------+-----------+-------------+
|               Event|      Date|      Stage|        Round|
+--------------------+----------+-----------+-------------+
|Champions Tour 20...|2025-02-23|Swiss Stage|Round 2 (0-1)|
+--------------------+----------+-----------+-------------+



In [199]:
map_list = list(data[match_name][1].keys())

### get team map data

In [200]:
# Function that gets team stats for each map in the match
def get_team_map_data(match_name, map_list):

    map_scored_df_pd = pd.DataFrame(columns=['Map', 'Winner', 'TeamOneName', 'TeamOneAbbreviation', 'TeamTwoName', 'TeamTwoAbbreviation' ,'TeamOneFinalScore', 'TeamOneCTScore', 'TeamOneTScore', 'TeamOneOTScore', 'TeamTwoFinalScore', 'TeamTwoCTScore', 'TeamTwoTScore', 'TeamTwoOTScore'])

    for i in map_list:
        map_name = i

        team1_stats = data[match_name][1][i]['Team Stats: '][team1_name]
        team2_stats = data[match_name][1][i]['Team Stats: '][team2_name]

        first_player = list(data[match_name][1][i]['Player Stats'].keys())[0]
        last_player = list(data[match_name][1][i]['Player Stats'].keys())[-1]

        team1_abbr = data[match_name][1][i]['Player Stats'][first_player]["Team"]
    
        team2_abbr = data[match_name][1][i]['Player Stats'][last_player]["Team"]

        team1_score = int(team1_stats['final'])
        team2_score = int(team2_stats['final'])
    

        winner = team1_name if int(team1_score) > int(team2_score) else team2_name

        temp_df1 = pd.DataFrame({'Map': map_name, 'Winner': winner, 'TeamOneName': team1_name,'TeamOneAbbreviation' : team1_abbr ,'TeamTwoName': team2_name, 'TeamTwoAbbreviation' : team2_abbr ,'TeamOneFinalScore': team1_stats['final'], 'TeamOneCTScore': team1_stats['CT'], 'TeamOneTScore' : team1_stats['T'], 'TeamOneOTScore' : team1_stats['OT'], 'TeamTwoFinalScore': team2_stats['final'], 'TeamTwoCTScore' : team2_stats['CT'], 'TeamTwoTScore' : team2_stats['T'], 'TeamTwoOTScore' : team2_stats['CT']}, index=[0])

        map_scored_df_pd = pd.concat([map_scored_df_pd, temp_df1], ignore_index=True)


    return spark.createDataFrame(map_scored_df_pd), map_scored_df_pd, team1_abbr, team2_abbr

map_scored_df_spark, map_scored_df_pd, team1_abbr, team2_abbr = get_team_map_data(match_name, map_list)
map_scored_df_spark.show()

+-----+-----------+-----------+-------------------+-----------+-------------------+-----------------+--------------+-------------+--------------+-----------------+--------------+-------------+--------------+
|  Map|     Winner|TeamOneName|TeamOneAbbreviation|TeamTwoName|TeamTwoAbbreviation|TeamOneFinalScore|TeamOneCTScore|TeamOneTScore|TeamOneOTScore|TeamTwoFinalScore|TeamTwoCTScore|TeamTwoTScore|TeamTwoOTScore|
+-----+-----------+-----------+-------------------+-----------+-------------------+-----------------+--------------+-------------+--------------+-----------------+--------------+-------------+--------------+
|Lotus|  Sentinels|Team Liquid|                 TL|  Sentinels|                SEN|                7|             2|            5|             0|               13|             7|            6|             7|
|Abyss|Team Liquid|Team Liquid|                 TL|  Sentinels|                SEN|               16|             7|            5|             4|               14|     

### get map player data

In [201]:
# Function that gets player stats for each map in the match
def get_map_player_data(map_list):

    map_player_stats_df_pd = pd.DataFrame(columns=['Map', 'PlayerName', 'Agent', 'Team', 'ACS_Overall','ACS_T', 'ACS_CT', 'Kills_Overall', 'Kills_T', 'Kills_CT',  'Deaths_Overall', 'Deaths_T', 'Deaths_CT', 'Assists_Overall', 'Assists_T', 'Assists_CT', 'KAST_Overall', 'KAST_T', 'KAST_CT', 'ADR_Overall', 'ADR_T', 'ADR_CT', 'HSPercentage_Overall', 'HSPercentage_T', 'HSPercentage_CT', 'FirstKills_Overall', 'FirstKills_T', 'FirstKills_CT', 'FirstDeaths_Overall', 'FirstDeaths_T', 'FirstDeaths_CT'])

    for i in map_list:
        map_name = i

        map_player_stats = data[match_name][1][i]['Player Stats']
    
        for player in map_player_stats:
            player_name = player
            player_stats = map_player_stats[player]

            temp_df3 = pd.DataFrame({'Map' : map_name,'PlayerName': player_name, 'Agent': player_stats['Agent'], 'Team' : player_stats['Team'], 'ACS_Overall': player_stats['ACS']['All'], 'ACS_T':player_stats['ACS']['T'], 'ACS_CT': player_stats['ACS']['CT'], 'Kills_Overall' : player_stats['Elims']['All'], 'Kills_T': player_stats['Elims']['T'], 'Kills_CT': player_stats['Elims']['CT'], 'Deaths_Overall' : player_stats['Deaths']['All'], 'Deaths_T': player_stats['Deaths']['T'], 'Deaths_CT': player_stats['Deaths']['CT'], 'Assists_Overall' : player_stats['Assists']['All'], 'Assists_T': player_stats['Assists']['T'], 'Assists_CT': player_stats['Assists']['CT'], 'KAST_Overall' :player_stats['KAST']['All'], 'KAST_T': player_stats['KAST']['T'], 'KAST_CT': player_stats['KAST']['CT'], 'ADR_Overall' : player_stats['ADR']['All'], 'ADR_T': player_stats['ADR']['T'], 'ADR_CT': player_stats['ADR']['CT'], 'HSPercentage_Overall' : player_stats['HS_percentage']['All'], 'HSPercentage_T': player_stats['HS_percentage']['T'], 'HSPercentage_CT': player_stats['HS_percentage']['CT'], 'FirstKills_Overall' : player_stats['First Kills']['All'], 'FirstKills_T': player_stats['First Kills']['T'], 'FirstKills_CT': player_stats['First Kills']['CT'], 'FirstDeaths_Overall' : player_stats['First Deaths']['All'], 'FirstDeaths_T': player_stats['First Deaths']['T'], 'FirstDeaths_CT': player_stats['First Deaths']['CT']}, index=[0])

            map_player_stats_df_pd = pd.concat([map_player_stats_df_pd, temp_df3], ignore_index=True)

    return spark.createDataFrame(map_player_stats_df_pd), map_player_stats_df_pd

map_player_stats_df__spark, map_player_stats_df_pd = get_map_player_data(map_list)

map_player_stats_df__spark.show(5)

+-----+----------+------+----+-----------+-----+------+-------------+-------+--------+--------------+--------+---------+---------------+---------+----------+------------+------+-------+-----------+-----+------+--------------------+--------------+---------------+------------------+------------+-------------+-------------------+-------------+--------------+
|  Map|PlayerName| Agent|Team|ACS_Overall|ACS_T|ACS_CT|Kills_Overall|Kills_T|Kills_CT|Deaths_Overall|Deaths_T|Deaths_CT|Assists_Overall|Assists_T|Assists_CT|KAST_Overall|KAST_T|KAST_CT|ADR_Overall|ADR_T|ADR_CT|HSPercentage_Overall|HSPercentage_T|HSPercentage_CT|FirstKills_Overall|FirstKills_T|FirstKills_CT|FirstDeaths_Overall|FirstDeaths_T|FirstDeaths_CT|
+-----+----------+------+----+-----------+-----+------+-------------+-------+--------+--------------+--------+---------+---------------+---------+----------+------------+------+-------+-----------+-----+------+--------------------+--------------+---------------+------------------+---

### get match aggregate data

### team data

In [202]:
# Function to get aggregate team data for the entire match over all maps
def get_agg_team_data(match_name, map_list):
    map_scored_df_spark, map_scored_df_pd, team1_abbr, team2_abbr = get_team_map_data(match_name, map_list)

    map_scored_df_pd = map_scored_df_pd.astype({ 'TeamOneFinalScore' : int, 'TeamOneCTScore' : int, 'TeamOneTScore' : int, 'TeamOneOTScore' : int, 'TeamTwoFinalScore' : int, 'TeamTwoCTScore' : int, 'TeamTwoTScore' : int, 'TeamTwoOTScore' : int})

    column_sums = map_scored_df_pd.sum(axis=0)

    match_scored_df_pd_sums = column_sums.to_frame()
    match_scored_df_pd_t = match_scored_df_pd_sums.transpose()
    team_match_scored_df_pd = match_scored_df_pd_t[['TeamOneName','TeamOneAbbreviation','TeamTwoName', 'TeamTwoAbbreviation','TeamOneFinalScore', 'TeamOneCTScore', 'TeamOneTScore', 'TeamOneOTScore', 'TeamTwoFinalScore', 'TeamTwoCTScore', 'TeamTwoTScore', 'TeamTwoOTScore']]
    team_match_scored_df_pd.at[0, 'TeamOneName'] = team1_name
    team_match_scored_df_pd.at[0, 'TeamTwoName'] = team2_name
    team_match_scored_df_pd.at[0, 'TeamOneAbbreviation'] = team1_abbr
    team_match_scored_df_pd.at[0, 'TeamTwoAbbreviation'] = team2_abbr

    team_match_scored_df_pd = team_match_scored_df_pd.astype({ 'TeamOneFinalScore' : int, 'TeamOneCTScore' : int, 'TeamOneTScore' : int, 'TeamOneOTScore' : int, 'TeamTwoFinalScore' : int, 'TeamTwoCTScore' : int, 'TeamTwoTScore' : int, 'TeamTwoOTScore' : int})


    schema = StructType([
        StructField('TeamOneName', StringType(), True),
        StructField('TeamOneAbbreviation', StringType(), True),
        StructField('TeamTwoName', StringType(), True),
        StructField('TeamOneAbbreviation', StringType(), True),
        StructField('TeamOneFinalScore', IntegerType(), True),
        StructField('TeamOneCTScore', IntegerType(), True),
        StructField('TeamOneTScore', IntegerType(), True),
        StructField('TeamOneOTScore', IntegerType(), True),
        StructField('TeamTwoFinalScore', IntegerType(), True),
        StructField('TeamTwoCTScore', IntegerType(), True),
        StructField('TeamTwoTScore', IntegerType(), True),
        StructField('TeamTwoOTScore', IntegerType(), True),
    ])

    return spark.createDataFrame(team_match_scored_df_pd, schema=schema),  team_match_scored_df_pd

team_match_agg_df_sprk, team_match_agg_df_pd = get_agg_team_data(match_name, map_list)

team_match_agg_df_sprk.show()


+-----------+-------------------+-----------+-------------------+-----------------+--------------+-------------+--------------+-----------------+--------------+-------------+--------------+
|TeamOneName|TeamOneAbbreviation|TeamTwoName|TeamOneAbbreviation|TeamOneFinalScore|TeamOneCTScore|TeamOneTScore|TeamOneOTScore|TeamTwoFinalScore|TeamTwoCTScore|TeamTwoTScore|TeamTwoOTScore|
+-----------+-------------------+-----------+-------------------+-----------------+--------------+-------------+--------------+-----------------+--------------+-------------+--------------+
|Team Liquid|                 TL|  Sentinels|                SEN|               36|            20|           12|             4|               28|            14|           12|            14|
+-----------+-------------------+-----------+-------------------+-----------------+--------------+-------------+--------------+-----------------+--------------+-------------+--------------+



### Player Data

In [203]:
# Function to get aggregate player data for the entire match over all maps
def get_agg_player_data(map_list):
    map_player_stats_df__spark, map_player_stats_df_pd = get_map_player_data(map_list)

    map_player_stats_df_pd_trim = map_player_stats_df_pd.copy()

    map_player_stats_df_pd_trim['KAST_Overall'] = map_player_stats_df_pd_trim['KAST_Overall'].str.replace('%', '').astype(float)
    map_player_stats_df_pd_trim['KAST_CT'] = map_player_stats_df_pd_trim['KAST_CT'].str.replace('%', '').astype(float)
    map_player_stats_df_pd_trim['KAST_T'] = map_player_stats_df_pd_trim['KAST_T'].str.replace('%', '').astype(float)
    map_player_stats_df_pd_trim['HSPercentage_Overall'] = map_player_stats_df_pd_trim['HSPercentage_Overall'].str.replace('%', '').astype(float)
    map_player_stats_df_pd_trim['HSPercentage_CT'] = map_player_stats_df_pd_trim['HSPercentage_CT'].str.replace('%', '').astype(float)
    map_player_stats_df_pd_trim['HSPercentage_T'] = map_player_stats_df_pd_trim['HSPercentage_T'].str.replace('%', '').astype(float)

    map_player_stats_df_pd_trim = map_player_stats_df_pd_trim.astype({'ACS_Overall': 'int', 'ACS_T': 'int', 'ACS_CT':  'int', 'Kills_Overall' : 'int', 'Kills_T': 'int', 'Kills_CT': 'int', 'Deaths_Overall' : 'int', 'Deaths_T': 'int', 'Deaths_CT': 'int', 'Assists_Overall' : 'int', 'Assists_T': 'int', 'Assists_CT': 'int', 'KAST_Overall' : 'float', 'KAST_T':  'float', 'KAST_CT':  'float', 'ADR_Overall' :  'int', 'ADR_T':  'int', 'ADR_CT':  'int', 'HSPercentage_Overall' :  'float', 'HSPercentage_T':  'float', 'HSPercentage_CT':  'float', 'FirstKills_Overall' : 'int', 'FirstKills_T': 'int', 'FirstKills_CT': 'int', 'FirstDeaths_Overall' : 'int', 'FirstDeaths_T': 'int', 'FirstDeaths_CT': 'int'})

    agg_player_stats_df_pd = map_player_stats_df_pd_trim.groupby('PlayerName').agg({'ACS_Overall': 'mean', 'ACS_T': 'mean', 'ACS_CT':  'mean', 'Kills_Overall' : 'sum', 'Kills_T': 'sum', 'Kills_CT': 'sum', 'Deaths_Overall' : 'sum', 'Deaths_T': 'sum', 'Deaths_CT': 'sum', 'Assists_Overall' : 'sum', 'Assists_T': 'sum', 'Assists_CT': 'sum', 'KAST_Overall' : 'mean', 'KAST_T':  'mean', 'KAST_CT':  'mean', 'ADR_Overall' :  'mean', 'ADR_T':  'mean', 'ADR_CT':  'mean', 'HSPercentage_Overall' :  'mean', 'HSPercentage_T':  'mean', 'HSPercentage_CT':  'mean', 'FirstKills_Overall' : 'sum', 'FirstKills_T': 'sum', 'FirstKills_CT': 'sum', 'FirstDeaths_Overall' : 'sum', 'FirstDeaths_T': 'sum', 'FirstDeaths_CT': 'sum'}).round(2)

    agg_player_stats_df_pd.reset_index(inplace=True)

    return spark.createDataFrame(agg_player_stats_df_pd), agg_player_stats_df_pd

agg_player_stats_df_spark, agg_player_stats_df_pd = get_agg_player_data(map_list)

agg_player_stats_df_spark.show()

+----------+-----------+------+------+-------------+-------+--------+--------------+--------+---------+---------------+---------+----------+------------+------+-------+-----------+------+------+--------------------+--------------+---------------+------------------+------------+-------------+-------------------+-------------+--------------+
|PlayerName|ACS_Overall| ACS_T|ACS_CT|Kills_Overall|Kills_T|Kills_CT|Deaths_Overall|Deaths_T|Deaths_CT|Assists_Overall|Assists_T|Assists_CT|KAST_Overall|KAST_T|KAST_CT|ADR_Overall| ADR_T|ADR_CT|HSPercentage_Overall|HSPercentage_T|HSPercentage_CT|FirstKills_Overall|FirstKills_T|FirstKills_CT|FirstDeaths_Overall|FirstDeaths_T|FirstDeaths_CT|
+----------+-----------+------+------+-------------+-------+--------+--------------+--------+---------+---------------+---------+----------+------------+------+-------+-----------+------+------+--------------------+--------------+---------------+------------------+------------+-------------+-------------------+----

### Get Team Info Data

In [204]:
#Function to get team info data

def get_team_info(match_name, map_list):
    team_match_agg_df_sprk, team_match_agg_df_pd = get_agg_team_data(match_name, map_list)

    team_match_agg_df_pd_sub = team_match_agg_df_pd[['TeamOneName', 'TeamOneAbbreviation', 'TeamTwoName', 'TeamTwoAbbreviation']]

    team_info_dict = {
        'TeamName' : [team_match_agg_df_pd_sub.at[0,'TeamOneName'], team_match_agg_df_pd_sub.at[0,'TeamTwoName']],
        'TeamAbbreviation' : [team_match_agg_df_pd_sub.at[0,'TeamOneAbbreviation'], team_match_agg_df_pd_sub.at[0,'TeamTwoAbbreviation']]
    }

    team_info_df = pd.DataFrame(team_info_dict)



    return spark.createDataFrame(team_info_df), team_info_df

team_info_df_spark, team_info_df_pd = get_team_info(match_name, map_list)
team_info_df_spark.show()

+-----------+----------------+
|   TeamName|TeamAbbreviation|
+-----------+----------------+
|Team Liquid|              TL|
|  Sentinels|             SEN|
+-----------+----------------+



### Get Player Info Data

In [205]:
# Function to get player info data
def get_player_info(map_list):
    map_player_stats_df__spark, map_player_stats_df_pd = get_map_player_data(map_list)

    player_name_list = map_player_stats_df_pd['PlayerName'].unique()

    player_info_df = pd.DataFrame({'PlayerName' : player_name_list})

    return spark.createDataFrame(player_info_df), player_info_df

player_info_df_spark, player_info_df_pd = get_player_info(map_list)
player_info_df_spark.show()

+----------+
|PlayerName|
+----------+
|      nAts|
|     Keiko|
|   paTiTek|
|     kamyk|
|      kamo|
|      bang|
|    zekken|
|   N4RRATE|
|    johnqt|
|   Zellsis|
+----------+



### Get Map Info Data

In [206]:
# Function to get map info data
def get_map_info(map_list):

    map_df = pd.DataFrame({'MapName' : map_list})

    return spark.createDataFrame(map_df), map_df

map_df_spark, map_df_pd = get_map_info(map_list)
map_df_spark.show()

+-------+
|MapName|
+-------+
|  Lotus|
|  Abyss|
|   Bind|
+-------+



25/02/26 16:48:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)