In [2]:
from lolapi import RiotUnofficialApi
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

DATA_LOCATION = "./data/riot/"

In [3]:
api = RiotUnofficialApi(
        api_key="0TvQnueqKa5mxJntVWt0w4LpLfEkrV1Ta8rQBb9Z",
        lang="en-US"
    )

In [4]:
leagues = api.get_leagues()
leagues_df = spark.createDataFrame(leagues)
(
    leagues_df
    .write
    .parquet(DATA_LOCATION + 'leagues/', mode='overwrite')
)

23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
23/08/05 10:20:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,

In [5]:
tournament_list = []
for league in leagues:
    tournament_list.extend(api.get_tournaments(league_id=league['id']))

tournament_df = spark.createDataFrame(tournament_list)

(
    tournament_df
    .write
    .mode('overwrite')
    .parquet(DATA_LOCATION + 'tournaments')
)

23/08/05 10:21:22 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import explode, col


result_schema = StructType([
    StructField("outcome", StringType(), nullable=True),
    StructField("gameWins", IntegerType(), nullable=True)
])

team_schema = StructType([
    StructField("id", StringType(), nullable=True),
    StructField("slug", StringType(), nullable=True),
    StructField("name", StringType(), nullable=True),
    StructField("code", StringType(), nullable=True),
    StructField("image", StringType(), nullable=True),
    StructField("result", result_schema, nullable=True)
])

match_schema = StructType([
    StructField("id", StringType(), nullable=True),
    StructField("state", StringType(), nullable=True),
    StructField("previousMatchIds", StringType(), nullable=True),
    StructField("flags", ArrayType(StringType()), nullable=True),
    StructField("teams", ArrayType(team_schema), nullable=True)
])

rankings_schema = StructType([
    StructField("ordinal", IntegerType(), nullable=True),
    StructField("teams", ArrayType(team_schema), nullable=True)
])

sections_schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("matches", ArrayType(match_schema), nullable=True),
    StructField("rankings", ArrayType(rankings_schema), nullable=True)
])

stages_schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("type", StringType(), nullable=True),
    StructField("slug", StringType(), nullable=True),
    StructField("sections", ArrayType(sections_schema), nullable=True)
])

In [7]:
from pyspark.sql import DataFrame

def explode_column(df: DataFrame, column_name: str) -> DataFrame:
    """
    Explode a specified column in a DataFrame and transform each record from the array into a new line.
    
    Parameters:
        df (DataFrame): The input DataFrame.
        column_name (str): The name of the column to explode.
        
    Returns:
        DataFrame: The DataFrame with the specified column exploded.
    """
    # Explode the specified column
    
    exploded_df = df.selectExpr("*", f"explode({column_name}) as {column_name}_exploded")
    
    # Explode the exploded struct column into individual columns with the parent column name as prefix
    # final_df = exploded_df.selectExpr("*", f"{column_name}_exploded.*").drop(f"{column_name}_exploded")
    for col_name in exploded_df.schema[column_name + "_exploded"].dataType.names:
        exploded_df = exploded_df.withColumn(f"{column_name}_{col_name}", col(f"{column_name}_exploded.{col_name}"))
    
    exploded_df = exploded_df.drop(f"{column_name}_exploded")
    exploded_df = exploded_df.drop(f"{column_name}")
    
    return exploded_df


In [8]:
stages_list = []
for tournament in tournament_list:
    standings = api.get_standings(tournament_list[0]['id'])
    stages_list.extend(standings[0]['stages'])

In [9]:
standings_df = spark.createDataFrame(stages_list, schema=stages_schema)


In [10]:
standings_df_final = (
    standings_df
    .transform(explode_column, "sections")
    .transform(explode_column, "sections_matches")
    .transform(explode_column, "sections_rankings")
    .transform(explode_column, "sections_matches_teams")
    .transform(explode_column, "sections_rankings_teams")
)

In [11]:
standings_df_final.show(3)

+--------------+----+--------------+--------------+-------------------+----------------------+---------------------------------+----------------------+-------------------------+-------------------------+---------------------------+---------------------------+---------------------------+----------------------------+-----------------------------+--------------------------+----------------------------+----------------------------+----------------------------+-----------------------------+------------------------------+
|          name|type|          slug| sections_name|sections_matches_id|sections_matches_state|sections_matches_previousMatchIds|sections_matches_flags|sections_rankings_ordinal|sections_matches_teams_id|sections_matches_teams_slug|sections_matches_teams_name|sections_matches_teams_code|sections_matches_teams_image|sections_matches_teams_result|sections_rankings_teams_id|sections_rankings_teams_slug|sections_rankings_teams_name|sections_rankings_teams_code|sections_rankings_

In [12]:
(
    standings_df_final
    .write
    .mode("overwrite")
    .parquet(DATA_LOCATION + "standings/")
)

23/08/05 10:23:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/05 10:23:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/05 10:23:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/08/05 10:23:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/08/05 10:23:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/08/05 10:23:05 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
23/08/05 10:23:06 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,

In [13]:
teams_list = api.get_teams()

In [14]:
teams_df = spark.createDataFrame(teams_list)

(
    teams_df
    .write
    .mode("overwrite")
    .parquet(DATA_LOCATION + "teams/")
)
teams_df.show(3)

23/08/05 10:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/05 10:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/05 10:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/08/05 10:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/08/05 10:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


+--------------------+---------------+----+--------------------+------------------+--------------------+----+-------+----+--------+
|    alternativeImage|backgroundImage|code|          homeLeague|                id|               image|name|players|slug|  status|
+--------------------+---------------+----+--------------------+------------------+--------------------+----+-------+----+--------+
|https://lolstatic...|           null|TBDD|{name -> MSI, reg...|100205572995797818|https://lolstatic...| TBD|     []| tbd|archived|
|https://lolstatic...|           null|TBDA|{name -> MSI, reg...|100205572997632804|https://lolstatic...| TBD|     []| tbd|archived|
|https://lolstatic...|           null|TBDC|{name -> MSI, reg...|100205572999591608|https://lolstatic...| TBD|     []| tbd|archived|
+--------------------+---------------+----+--------------------+------------------+--------------------+----+-------+----+--------+
only showing top 3 rows



In [261]:
match_details_list = []
for match_reg in standings_df_final.groupBy('sections_matches_id').count().collect():
    match_details = api.get_match_details(match_id=match_reg['sections_matches_id'])
    if match_details:
        match_details_list.append(match_details)

# match_details = api.get_match_details(match_id=standings_df_final.collect()[-1]['sections_matches_id'])

In [264]:
match_details_df = spark.createDataFrame(match_details_list)

In [280]:
[f"blue_team.{k}" for k  in match_details_list[0]['blue_team'].keys()]

['blue_team.players',
 'blue_team.esportsTeamId',
 'blue_team.totalGold',
 'blue_team.inhibitors',
 'blue_team.towers',
 'blue_team.barons',
 'blue_team.totalKills',
 'blue_team.dragons']

In [271]:
match_details_df.printSchema()


root
 |-- blue_team: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: long (valueContainsNull = true)
 |-- red_team: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: long (valueContainsNull = true)
 |-- timestamp: string (nullable = true)

