In [2]:
from pyspark.sql import functions as F

# import data, add some useful columns, and rename the team columns to t1_ t2_ so we can catch them in loops

In [19]:
%run ./config.ipynb

In [20]:
df_csgo = (spark
    .read
    .option("inferSchema","true")
    .option("header", True)
    .csv(f"{PATH_TO_CSGO_DATA}/csgo_games.csv")
    .withColumn("match_id", F.monotonically_increasing_id())
    .withColumn('winning_team_name',
        F.when(F.col('winner') == 't1', F.col('team_1'))
         .when(F.col('winner') == 't2', F.col('team_2'))
         .otherwise(F.lit(None))
    )
    .withColumnRenamed('team_1', 't1')
    .withColumnRenamed('team_2', 't2')
    .withColumn('t1_avg_player_rating',
        eval('(' + ' + '.join(['F.col("t1_player' + str(i) + '_rating")' for i in range(1, 6)]) + ') / 5')
    )
    .withColumn('t2_avg_player_rating',
        eval('(' + ' + '.join(['F.col("t2_player' + str(i) + '_rating")' for i in range(1, 6)]) + ') / 5')
    )
    .withColumn('t1_snipers_on_team',
        eval(' + '.join(
            ['F.when(F.col("t1_player' + str(i) + '_is_sniper") == True, F.lit(1)).otherwise(F.lit(0))' for i in range(1, 6)])
        )
    )
    .withColumn('t2_snipers_on_team',
        eval(' + '.join(
            ['F.when(F.col("t2_player' + str(i) + '_is_sniper") == True, F.lit(1)).otherwise(F.lit(0))' for i in range(1, 6)])
        )
    )
)
print('df_csgo')

df_csgo


# Create df with match stats only

In [4]:
df_matches = (df_csgo
    .select(
        'match_id',
        'match_date',
        't1', 
        't2', 
        't1_points', 
        't2_points', 
        't1_world_rank', 
        't2_world_rank', 
        't1_h2h_win_perc', 
        't2_h2h_win_perc', 
        'winner', 
        'winning_team_name',
        't1_snipers_on_team',
        't2_snipers_on_team'
    )
)

# df_matches.show(10)
print('df_matches - no player data')

# Restructure df, one row per team

In [26]:
df_teams = (
    df_matches
        .select(
            'match_id',
            'match_date',
            F.lit('t1').alias('team_no'),
            F.col('t1').alias('team_name'), 
            F.col('t1_points').alias('points'), 
            F.col('t1_world_rank').alias('world_rank'), 
            F.col('t1_h2h_win_perc').alias('h2h_win_perc'),
            'winning_team_name',
            F.col('t1_snipers_on_team').alias('snipers_on_team')
        )
    .union(df_matches
        .select(
            'match_id',
            'match_date',
            F.lit('t2').alias('team_no'),
            F.col('t2').alias('team_name'), 
            F.col('t2_points').alias('points'), 
            F.col('t2_world_rank').alias('world_rank'), 
            F.col('t2_h2h_win_perc').alias('h2h_win_perc'),
            'winning_team_name',
            't2_snipers_on_team',
        )
    )
    .withColumn('won',
        F.when(F.col('team_name') == F.col('winning_team_name'), F.lit(1))
        .otherwise(F.lit(0))
    )
    .drop('winning_team_name')
    .orderBy('match_id')
)

# df_teams.show()
# df_teams.printSchema()
print('df_teams - one row per match')

root
 |-- match_id: long (nullable = false)
 |-- match_date: string (nullable = true)
 |-- team_no: string (nullable = false)
 |-- team_name: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- world_rank: integer (nullable = true)
 |-- h2h_win_perc: double (nullable = true)
 |-- snipers_on_team: integer (nullable = false)
 |-- won: integer (nullable = false)

df_matches - one row per match


In [6]:
## challenge, find another way of doing the above, maybe this could help:

In [7]:
# from pyspark.sql.functions import array, col, explode, struct, lit

# df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

# def to_long(df, by):

#     # Filter dtypes and split into column names and type description
#     cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
#     # Spark SQL supports only homogeneous columns
#     assert len(set(dtypes)) == 1, "All columns have to be of the same type"

#     # Create and explode an array of (column_name, column_value) structs
#     kvs = explode(array([
#       struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
#     ])).alias("kvs")

#     return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

# to_long(df, ["A"]).show()


In [8]:
# Make df_player stats - one row per player

In [9]:
match_columns = ['match_date', 'match_id', 'winner']
teams = ['t1', 't2']
players = ['player' + str(i) for i in range(1,6)]

In [34]:
import re

# grab all the columns that contain 't1', 't2', or 'player', and select them along with the match columns
# The result is an exploded df with one row per player

for i, team in enumerate(teams):
    for j, player in enumerate(players):
        player_cols = []
        for col in df_csgo.columns:
            if team in col and player in col:
                player_cols.append(col)
        this_player_cols = (df_csgo
             .select(
                 *match_columns,
                 F.lit(team).alias('team'),
                 F.lit(player).alias('player'),
                 *player_cols               
             )
        )
        if (i == 0 and j == 0):
            df_players = this_player_cols
        else:
            df_players = (df_players
                .union(this_player_cols)
            )

df_players = (df_players
    .withColumn('won',
        F.when(F.col('team') == F.col('winner'), F.lit(1))
        .otherwise(F.lit(0))
    )
    .select( 
        *[F.col(column_name).alias(re.sub('t1_player1_', '', column_name)) for column_name in df_players.columns]
    )
    .orderBy('match_id', 'team', 'player')
)

df_players.printSchema()
print('df_players - one row per player')   


root
 |-- match_date: string (nullable = true)
 |-- match_id: long (nullable = false)
 |-- winner: string (nullable = true)
 |-- team: string (nullable = false)
 |-- player: string (nullable = false)
 |-- rating: double (nullable = true)
 |-- impact: double (nullable = true)
 |-- kdr: double (nullable = true)
 |-- dmr: double (nullable = true)
 |-- kpr: double (nullable = true)
 |-- apr: double (nullable = true)
 |-- dpr: double (nullable = true)
 |-- spr: double (nullable = true)
 |-- opk_ratio: double (nullable = true)
 |-- opk_rating: double (nullable = true)
 |-- wins_perc_after_fk: double (nullable = true)
 |-- fk_perc_in_wins: double (nullable = true)
 |-- multikill_perc: double (nullable = true)
 |-- rating_at_least_one_perc: double (nullable = true)
 |-- is_sniper: boolean (nullable = true)
 |-- clutch_win_perc: double (nullable = true)
 |-- won: integer (nullable = false)

df_players - one row per player
