In [1]:
import os
os.environ["PYSPARK_PYTHON"] = r"C:\Python311\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Python311\python.exe"

Sagemaker job configs:
1. Set Python version to 3.11
###### Dockerfile: 
    `ENV PYSPARK_PYTHON=/usr/bin/python3.11
    ENV PYSPARK_DRIVER_PYTHON=/usr/bin/python3.11`
###### Training job definition
    `(the SageMaker SDK Estimator call), via its environment={...} dict.`


Remember to optimize code:
5. Tune if needed
Long GC pauses? → drop to 4 cores/executor

Too much scheduling overhead? → bump to 6 or 8 cores/executor

Straggler tasks? → add more partitions, not more cores

In [2]:
# Before running your code, clear temp files
import shutil
import os

temp_dir = r"C:\Users\17862\AppData\Local\Temp"
for folder in os.listdir(temp_dir):
    if folder.startswith("spark-") or folder.startswith("blockmgr-"):
        try:
            shutil.rmtree(os.path.join(temp_dir, folder))
        except:
            pass  # Some might be in use

In [2]:
import os, sys, csv, re, json
from typing import Dict, List, Set, Tuple
from pathlib import Path
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql import functions as F
import logging
from enum import Enum


# ensure Python child processes use the same interpreter
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# stop any old session
try: spark.stop()
except: pass


spark = (
    SparkSession.builder
        .appName("MatchDataProcessor_Local")
        .config("spark.driver.memory", "4g")
        .config("spark.sql.adaptive.enabled", "false")
        .config("spark.local.dir", "C:/temp/spark")
        .config("spark.sql.warehouse.dir", "C:/temp/spark-warehouse")
        .config("spark.python.worker.reuse", "false")
        .config("spark.sql.execution.arrow.pyspark.enabled", "false")
        .config("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", "false")
        .getOrCreate()
)


print("Spark up:", spark.range(1).count())

Spark up: 1


In [3]:
def get_items_dict():
    # 1. Try to locate this file’s parent folders via __file__, else use cwd()
    try:
        # when running as a script, __file__ is defined
        script_path = Path(__file__).resolve()
        base_dir = script_path.parent.parent   # …/data_processing/
    except NameError:
        # in a notebook or REPL, __file__ doesn't exist—so assume cwd is data_processing/
        base_dir = Path.cwd().resolve()

    # 2. Build the path to your modules/item_id_tags.json
    json_path = base_dir / "modules" / "item_id_tags.json"

    # 3. Load it
    with json_path.open("r") as data:
        items_dict = json.load(data)

    return items_dict

In [4]:
LARGE_CSV = "3gb_sample.csv"
SMALL_CSV = "3mb_sample.csv"
SMALL_CSV_PATH = "C:/Users/17862/Desktop/SnexCode/lol-helper/data_processing/3mb_sample.csv"
LARGE_CSV_PATH = "C:/Users/17862/Desktop/SnexCode/lol-helper/data_processing/3gb_sample.csv"

SMALL_SNAKE_CSV = "C:/Users/17862/Desktop/SnexCode/lol-helper/data_processing/camel_to_snake/small_snake_csv/3mb_sample_snake.csv"
LARGE_SNAKE_CSV = "C:/Users/17862/Desktop/SnexCode/lol-helper/data_processing/camel_to_snake/large_snake_csv/3gb_sample_snake.csv"

RANKED_SOLO_DUO_QUEUE_ID = 420
DEFAULT_PARTITIONS = 200
PATCH_ERROR_PATTERN = r"Patch Error: Patch \d+(\.\d+)?"

# Role specific metrics
JUNGLER_METRICS = {
    "jungler_kills_early_jungle": "challenges.jungler_kills_early_jungle",
    "kills_on_laners_early_jungle_as_jungler": "challenges.kills_on_laners_early_jungle_as_jungler"
}
SUPPORT_METRICS = {
    "complete_support_quest_in_time": "challenges.complete_support_quest_in_time"
}
TOP_METRICS = {}
MID_METRICS = {}
BOT_METRICS = {}

class TeamPosition(Enum): 
    JUNGLE = ("JUNGLE", JUNGLER_METRICS)
    SUPPORT = ("UTILITY", SUPPORT_METRICS)  # Note: UTILITY is the internal name for Support
    TOP = ("TOP", TOP_METRICS)
    MID = ("MIDDLE", MID_METRICS)
    BOT = ("BOTTOM", BOT_METRICS)

    def __init__(self, label: str, metrics: dict):
        self.label = label
        self.metrics = metrics

    def __str__(self):
        """String representation for better logging."""
        return f"{self.name}({self.label})"
    


# Column and field name constants
MATCH_DATA = "match_data"
MATCH_ID = "match_id"
TEAM_POSITION = "team_position"

SUMM_SPELLS_DICT = {
    "1" : "cleanse",
    "3" : "exhaust",
    "4" : "flash",
    "6" : "ghost", 
    "7" : "heal",
    "11" : "smite", 
    "12" : "teleport",
    "14" : "ignite",
    "21" : "barrier"
}


In [3]:
print(spark.range(1).count())

1


Items and summoners functions
- For items: as of now we are only using the item tags for our model, in the future it might be worth considering adding other item info, such as cost, etc 

In [4]:
items_dict = get_items_dict()


unique_tags = {tag for _, tags in items_dict.items() for tag in tags}

print(unique_tags)

{'spell_block', 'ability_haste', 'on_hit', 'active', 'slow', 'aura', 'damage', 'nonboots_movement', 'spell_vamp', 'cooldown_reduction', 'armor_penetration', 'critical_strike', 'health', 'stealth', 'trinket', 'gold_per', 'mana', 'attack_speed', 'life_steal', 'vision', 'magic_resist', 'boots', 'tenacity', 'mana_regen', 'consumable', 'jungle', 'spell_damage', 'lane', 'health_regen', 'armor', 'magic_penetration'}


In [5]:
def map_tags_and_summoner_spells_to_df(
    participants_df: DataFrame,
    items_dict: Dict[str, List[str]],
    summoner_spells_dict: Dict[str, str],
    spark: SparkSession
) -> Tuple[DataFrame, Set[str], Set[str]]:
    """
    Map item tags and summoner spell names to participants dataframe with feature columns.
    
    This function enriches the participants dataframe by mapping item IDs to their associated
    tags and summoner spell IDs to their names. It creates feature columns for each unique
    tag count and binary indicators for each summoner spell, all aimed for aggregation.
    
    Parameters
    ----------
    participants_df : pyspark.sql.DataFrame
        DataFrame containing participant data with columns:
        - item0, item1, ..., item5: Item IDs for each item slot
        - summoner1_id, summoner2_id: Summoner spell IDs
    items_dict : dict[str, list[str]]
        Dictionary mapping item IDs (as strings) to lists of associated tags.
        Example: {"1001": ["Boots", "Movement"], "1004": ["Mana", "Regen"]}
    summoner_spells_dict : dict[str, str]
        Dictionary mapping summoner spell IDs (as strings) to spell names.
        Example: {"4": "flash", "12": "teleport"}
    spark : pyspark.sql.SparkSession
        Active Spark session for creating broadcast DataFrames.
        
    Returns
    -------
    participants_df : pyspark.sql.DataFrame
        Enhanced DataFrame with additional columns:
        - number_of_items_completed: Count of non-empty item slots (0-6)
        - item_tags: Flattened array of all tags from equipped items
        - tag_[{tag_name}]_count: Count of each unique tag across all items
        - summoner_spells_per_game: Array of summoner spell names
        - has_{spell_name}: Binary indicator (0/1) for each summoner spell
    unique_item_tags : set[str]
        Set of all unique item tags found in items_dict.
    unique_summoner_spells : set[str]
        Set of all unique summoner spell names found in summoner_spells_dict.
        
    Examples
    --------
    >>> # Assuming spark session and data are prepared
    >>> items = {"1001": ["Boots"], "3078": ["Health", "Damage"]}
    >>> spells = {"4": "Flash", "12": "Teleport"}
    >>> result_df, tags, spells = map_tags_and_summoner_spells_to_df(
    ...     participants_df, items, spells, spark
    ... )
    >>> result_df.select("tag_[Boots]_count", "has_flash").show()
    
    Notes
    -----
    - The function assumes exactly 6 item slots (item0 through item5).
    - Empty item slots are handled gracefully with empty arrays.
    - Broadcast joins are used for performance optimization.
    - Column names with special characters are wrapped in brackets: tag_[{name}]_count
    - Uses efficient single select operations for feature creation instead of 
      multiple withColumn calls for better Spark performance.
    """
    NUM_ITEM_SLOTS = 6
    EMPTY_STRING = ""

    # ========== Dict Data Validation ==========
    if not items_dict:
        raise ValueError("items_dict cannot be empty")
    if not summoner_spells_dict:
        raise ValueError("summ_spells_dict cannot be empty")
    
    # ========== Prepare lookup DataFrames ==========
    # Create broadcast df for items_dict and summoner_spells_dict
    items_data = [(int(item_id), tags) for item_id, tags in items_dict.items()]
    items_lookup_df = spark.createDataFrame(items_data, ["item_id", "tags"])
    unique_item_tags = {tag for _, tags in items_dict.items() for tag in tags}
    items_lookup_df = F.broadcast(items_lookup_df)

    summoner_spell_data = [
        (int(summ_spell_id), summ_spell_name) 
        for summ_spell_id, summ_spell_name in summoner_spells_dict.items()
        ]
    summoner_spell_lookup_df = spark.createDataFrame(
        summoner_spell_data, ["summoner_spell_id", "summoner_spell_name"]
        )
    unique_summoner_spells = {
        spell_name for _, spell_name in summoner_spells_dict.items()
        }  
    summoner_spell_lookup_df = F.broadcast(summoner_spell_lookup_df)

    # ========== Process Item Tags ==========
    # Create a new tags column per item and add item count features
    for i in range(NUM_ITEM_SLOTS):
        participants_df = (
            participants_df
            .join( 
                items_lookup_df.withColumnRenamed("tags", f"tags_{i}"), 
                F.col(f"item{i}") == F.col("item_id"), 
                "left" 
            )
            .drop("item_id")
        )
 
    participants_df = (
        participants_df
        # Create column to count completed items
        .withColumn(
            "number_of_items_completed",
            (
            F.when(F.size(F.col("tags_0")) > 0, 1).otherwise(0) +
            F.when(F.size(F.col("tags_1")) > 0, 1).otherwise(0) +
            F.when(F.size(F.col("tags_2")) > 0, 1).otherwise(0) +
            F.when(F.size(F.col("tags_3")) > 0, 1).otherwise(0) +
            F.when(F.size(F.col("tags_4")) > 0, 1).otherwise(0) +
            F.when(F.size(F.col("tags_5")) > 0, 1).otherwise(0)
            )
        )
        # Flatten all item columns into one
        .withColumn(
            "item_tags",
            F.flatten(
                F.array(*[F.coalesce(F.col(f"tags_{i}"), F.array()) 
                for i in range(NUM_ITEM_SLOTS)])
            )
        )
        .drop(*[F.col(f"tags_{i}") for i in range(NUM_ITEM_SLOTS)])
    )
    
    # Adding a binary column per tag to facilitate later aggregation
    for tag in unique_item_tags:
        participants_df = participants_df.withColumn(
            f"tag_[{tag}]_count",
            F.size(F.expr(f"filter(item_tags, x -> x = '{tag}')"))
        )

    # ========== Process Summoner Spells ==========
    # Process all summoner spell columns
    participants_df = (
        participants_df
        .join(
            summoner_spell_lookup_df.alias("s1"), # Alias is necessary for the second join
            F.col("summoner1_id") == F.col("s1.summoner_spell_id"),
            "left"
        )
        .join(
            summoner_spell_lookup_df.alias("s2"),
            F.col("summoner2_id") == F.col("s2.summoner_spell_id"),
            "left"
        )
        .withColumn(
            "summoner_spells_per_game",
            F.array(
                F.coalesce(F.col("s1.summoner_spell_name"),
                           F.lit(EMPTY_STRING)),
                F.coalesce(F.col("s2.summoner_spell_name"),
                           F.lit(EMPTY_STRING))
            )
        )
        .drop("s1.summoner_spell_id", "s2.summoner_spell_id",
              "s1.summoner_spell_name", "s2.summoner_spell_name")
    )

    # Adding a binary column per summoner spell for later aggregation
    for summoner_spell in unique_summoner_spells:
        participants_df = participants_df.withColumn(
            f"has_{summoner_spell}", 
            F.array_contains("summoner_spells_per_game",
                             summoner_spell).cast("int")
    )
    
    return participants_df, unique_item_tags, unique_summoner_spells


Create df functions

Needed to remove all patch error samples from matchData, also decided to extract JSON sample for schema inferrence in Python (due to its ease) in a separate function and pass it to create_match_df. In that create match function I removed all Patch Error rows and finally filtered out non ranked game rows (where queueId != 420) and dropped all matchId duplicates so we only have one row per match

In [6]:
def explode_and_flatten_struct(
        all_matches_df: DataFrame,
        desired_struct_name: str,
        base_path: str = "match_data_struct.info"
    ) -> DataFrame:
    """
    Explode and flatten a nested struct array from match data into separate rows.
    
    This function takes a DataFrame containing match data with nested struct arrays
    and explodes a specific struct field into individual rows, then flattens the 
    struct columns into top-level columns.
    
    Parameters
    ----------
    all_matches_df : pyspark.sql.DataFrame
        DataFrame containing match data with a nested structure at 
        "match_data_struct.info.{desired_struct_name}". Must contain at least:
        - match_id: Identifier for each match
        - match_data_struct.info.{desired_struct_name}: Array of structs to explode
    desired_struct_name : str
        Name of the struct array to explode within match_data_struct.info.
        Should be plural (e.g., "participants", "teams", "bans").
        The function will remove the trailing "s" for the alias.
    base_path: str
        Optional configurable path of match_data struct
        Defaults to current path "match_data_struct.info" 
        
    Returns
    -------
    pyspark.sql.DataFrame
        Flattened DataFrame with one row per array element containing:
        - match_id: Original match identifier (duplicated for each exploded row)
        - All top-level fields from the exploded struct as columns
        
    Examples
    --------
    >>> # Assuming df has structure: match_data_struct.info.participants
    >>> participants_df = explode_and_flatten_struct(all_matches_df, "participants")
    >>> participants_df.printSchema()
    root
     |-- match_id: string (nullable = true)
     |-- championId: integer (nullable = true)
     |-- summonerId: string (nullable = true)
     |-- ...
     
    >>> # For a match with 10 participants, this creates 10 rows
    >>> participants_df.groupBy("match_id").count().show()
    +----------+-----+
    |match_id  |count|
    +----------+-----+
    |MATCH_001 |   10|
    |MATCH_002 |   10|
    +----------+-----+
    
    Notes
    -----
    - The function assumes a specific nested structure: match_data_struct.info.{struct_name}
    - The struct name should be plural as the function strips the trailing "s" for aliasing
    - Each match_id will be duplicated for each element in the exploded array
    - All nested fields within the struct are promoted to top-level columns
    
    Raises
    ------
    AnalysisException
        If the specified struct path doesn't exist in the DataFrame
    """

    struct_path = f"{base_path}.{desired_struct_name}"
    element_alias = desired_struct_name.rstrip("s")

    return (
        all_matches_df
        .select(
            F.col(MATCH_ID),
            F.explode(F.col(struct_path)).alias(element_alias)
        )
        .select(
            MATCH_ID,
            F.col(element_alias + ".*")
        )
    )

In [None]:
def extract_sample_json(csv_path: str) -> str:
    """Grab the first non–error match_data string from CSV."""
    with open(csv_path, newline="", encoding="utf-8") as f:
        reader = csv.reader(f)
        header = next(reader)
        index = header.index("match_data")
        for row in reader:
            value = row[index]
            # Ensure to skip "Patch Error" rows
            if value and not re.match(r"Patch Error: Patch \d+(\.\d+)?", value):
                return value
    raise RuntimeError("No valid match_data sample found")

def create_matches_df(
    spark: SparkSession,
    csv_file_path: str,
    npartitions: int = DEFAULT_PARTITIONS,
    queue_id: int = 420 
) -> Tuple[DataFrame, DataFrame]:
    """
    Load and process match data from CSV containing JSON strings into structured DataFrames.
    
    This function reads a CSV file containing match data as JSON strings, filters out
    error records, parses the JSON into structured format, and extracts participant
    and team information for ranked solo/duo queue matches (queueId 420).
    
    Parameters
    ----------
    spark : pyspark.sql.SparkSession
        Active Spark session for DataFrame operations.
    csv_file_path : str
        Path to the CSV file containing match data. Expected columns:
        - match_id: Unique identifier for each match
        - match_data: JSON string containing full match information
    npartitions : int, default=200
        Number of partitions to use after filtering. Adjust based on data size
        and cluster resources.
    queue_id: int, default=420
        Identifier representing ranked games for filtering purposes.
        
    Returns
    -------
    participants_df : pyspark.sql.DataFrame
        DataFrame with one row per participant containing:
        - match_id: Match identifier
        - All participant fields from the JSON structure
        - Filtered to only include participants with non-null team_position
    teams_df : pyspark.sql.DataFrame
        DataFrame with one row per team (2 per match) containing:
        - match_id: Match identifier
        - All team fields from the JSON structure
        
    Examples
    --------
    >>> spark = SparkSession.builder.appName("MatchAnalysis").getOrCreate()
    >>> participants, teams = create_matches_df(
    ...     spark, 
    ...     "hdfs://data/matches.csv",
    ...     npartitions=100
    ... )
    >>> participants.groupBy("team_position").count().show()
    +-------------+-------+
    |team_position|  count|
    +-------------+-------+
    |          TOP| 123456|
    |       JUNGLE| 123456|
    |          MID| 123456|
    |          BOT| 123456|
    |      SUPPORT| 123456|
    +-------------+-------+
    
    Notes
    -----
    - The function caches the cleaned DataFrame after filtering and repartitioning
    - Only processes ranked solo/duo queue matches (queueId == 420)
    - Removes duplicate matches based on match_id
    - Filters out "Patch Error" records which indicate incomplete data
    - Schema inference reads all JSON data (samplingRatio=1.0) for accuracy
    - Participants without a team_position (e.g., spectators) are excluded
    
    Raises
    ------
    AnalysisException
        If the CSV file cannot be read or required columns are missing
    """

    # ========== Extract df from CSV File ==========
    raw_csv_df = (
        spark.read
             .option("header", True)
             .option("multiLine", True)
             .option("escape", "\"")
             .option("quote", "\"")
             .csv(csv_file_path)
    )

    # ========== Filtering by Current Patch ==========
    # Drop "Patch Error" rows, repartition & cache
    filtered_by_patch_df = (
        raw_csv_df
            .filter(F.col(MATCH_DATA).isNotNull())
            .filter(~F.col(MATCH_DATA).rlike(PATCH_ERROR_PATTERN))
            .repartition(npartitions)
            .cache()
    )

    # ========== Full JSON Schema Inference ==========
    # Build an RDD[String] of all the match_data JSON texts
    json_rdd = (
        filtered_by_patch_df
        .select(MATCH_DATA)
        .rdd
        .map(lambda r: r[MATCH_DATA])
        )
    # Have Spark read *all* of it as JSON, sampling 100%, so it can build a complete schema
    inferred_schema = (
        spark.read
             .option("multiLine", True)
             .option("samplingRatio", 1.0)
             .json(json_rdd)
             .schema
    )

    # ========== Parsing JSON Dict ==========
    # Parse into nested struct & drop raw text
    parsed_struct_df = (
        filtered_by_patch_df
          .withColumn(f"{MATCH_DATA}_struct", F.from_json(F.col(MATCH_DATA), inferred_schema))
          .drop(MATCH_DATA)
    )

    # ========== Filtering Distinct Ranked Games ==========
    # Keep only queueId == 420 (i.e. ranked games), then drop duplicates on matchId
    ranked_matches_df = (
        parsed_struct_df.filter(F.col(f"{MATCH_DATA}_struct.info.queue_id") == queue_id)
           .dropDuplicates(["match_id"])
    )

    participants_df = explode_and_flatten_struct(ranked_matches_df, "participants")
    participants_df = participants_df.filter(
            F.col("team_position").isNotNull()
    )

    # ========== Explode and Flatten Struct ==========
    teams_df = explode_and_flatten_struct(ranked_matches_df, "teams")

    filtered_by_patch_df.unpersist()

    return participants_df, teams_df

In [None]:
"""TESTING ABOVE"""
df_participants, df_teams = create_matches_df(spark, LARGE_SNAKE_CSV)

df_participants.show(5, truncate=False)
df_teams.show(5, truncate=False)
###################

In [8]:
def derive_participant_dragon_stats(df_participants: DataFrame):
    """Helper function to create dragon statistics columns"""
    # Note: for participants with no dragon takedown in match, the "earliest_dragon_takedown" has a NULL value
    ## For now we will keep it as is because PySpark ignore nulls and since these columns are used for summation statistics it doesn't matter
    drag_takedown = F.col("challenges.earliest_dragon_takedown")
    
    df_participants_with_dragon_stats = (
        df_participants
        .withColumn("had_dragon_takedown", drag_takedown.isNotNull().cast("int"))
        # Flag drag_takedown between 5 and 7 minutes (i.e. 301–420 seconds)
        .withColumn(
            "first_drag_takedown_min_5_to_7",
            drag_takedown.between(301, 420).cast("int")
        )
        .withColumn(
            "first_drag_takedown_min_7_to_11",
            drag_takedown.between(421, 660).cast("int")
        )
        .withColumn(
            "first_drag_takedown_min_11_to_15",       
            drag_takedown.between(661, 900).cast("int")
        )
        .withColumn(
            "first_drag_takedown_min_15+",
            F.when((drag_takedown > 900), F.lit(1))
        )
    )

    return df_participants_with_dragon_stats

In [None]:
def filter_position_specific_metrics(participants_df: DataFrame) -> DataFrame:
    """
    Filter challenge metrics to only appear for players in their appropriate roles.
    
    This function creates role-specific columns that contain challenge metrics only
    for players who played the corresponding role. Players in other roles will have
    null values for metrics that don't apply to their position.
    
    Parameters
    ----------
    participants_df : pyspark.sql.DataFrame
        DataFrame containing participant data with columns:
        - team_position: Player's assigned role ('JUNGLE', 'UTILITY', 'TOP', 'MIDDLE', 'BOTTOM')
        - challenges.*: Nested structure containing various challenge metrics
        
    Returns
    -------
    pyspark.sql.DataFrame
        Original DataFrame with additional role-specific metric columns.
        Each metric column contains values only for the appropriate role,
        with null values for all other roles.
        
    Raises
    ------
    ValueError
        If required columns are missing from input DataFrame.
        
    Examples
    --------
    >>> df_result = filter_position_specific_metrics(participants_df)
    >>> # Verify jungler metrics only appear for junglers
    >>> df_result.groupBy("team_position").agg(
    ...     F.count(F.when(F.col("jungler_kills_early_jungle").isNotNull(), 1))
    ... ).show()
    
    Notes
    -----
    - Currently only JUNGLE and SUPPORT roles have defined metrics
    - Other roles (TOP, MID, BOT) will pass through without new columns
    - 'UTILITY' is the internal identifier for the Support role
    """
    
    # ========== Required Columns Validation ==========
    required_columns = [TEAM_POSITION]
    existing_columns = participants_df.columns
    missing_columns = set(required_columns) - set(existing_columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")

    # ========== Flattening Metrics when Role Condition Met ==========
    result_df = participants_df
    for role in TeamPosition:
        role_condition = F.col(TEAM_POSITION) == role.label
        
        for metric_name, challenge_path in role.metrics.items():
            result_df = result_df.withColumn(
                metric_name,
                F.when(role_condition, F.col(challenge_path)).otherwise(None)
            )


    return result_df

In [None]:
"""Need further investigating"""
F.avg("challenges.void_monster_kill").alias("avg_individual_void_monster_kills"), # DATA VALIDATION: Void grubs? I have seen this have a valye of 7 when the champion did NOT have a rift herald takedown
F.avg("challenges.legendary_count").alias("avg_legendary_count"), # Extra digginig: seems to be legendary killing spree count, does 2 mean you were legendary died and then legendary again? confirmed non zero


"""Potential additional analysis opportunity"""
F.mode("individual_position").alias("mode_individual_position"), # Best guess for which position the player actually played in isolation of anything else
F.mode("lane").alias("mode_lane"), # Gives slightly different string than above, might have something to do with where champ spent most of the time
F.mode("role").alias("mode_role"),
F.mode("team_position").alias("mode_team_position"), # The teamPosition is the best guess for which position the player actually played if we add the constraint that each team must have one top player, one jungle, one middle, etc
F.sum(F.col("challenges.played_champ_select_position").cast("int")).alias("pct_of_games_played_champ_select_position") # only present if no roleswap - seems useless

In [None]:
#F.avg("challenges.killed_champ_took_full_team_damage_survived").alias("avg_times_killed_champ_took_full_team_damage_and_survived"), # DATA VALIDATION: check if always zero
#F.avg(F.col("challenges.early_laning_phase_gold_exp_advantage") * 100).alias("pct_of_games_with_early_lane_phase_gold_exp_adv"), # DATA VALIDATION: Seems to only give "1" to a single player per match
#F.avg(F.col("challenges.laning_phase_gold_exp_advantage") * 100).alias("pct_of_games_with_lanephase_gold_exp_adv"), # DATA VALIDATION: Seems to only give "1" to a single player per match, also check if repeat of above
#F.sum(F.col("challenges.fastest_legendary").isNotNull().cast("int")).alias("total_games_fastest_item_completion"), # DATA VALIDATION: seems to be 1 per team, check
#F.avg("challenges.jungler_kills_early_jungle").alias("avg_jungler_kills_early_jungle"), # DATA VALIDATION: what is this
#F.avg("challenges.kills_on_laners_early_jungle_as_jungler").alias("avg_jungler_early_kills_on_laners"), # DATA VALIDATION: what is this
#F.avg("challenges.get_takedowns_in_all_lanes_early_jungle_as_laner").alias("avg_times_had_early_takedowns_in_all_lanes_as_laner"), # DATA VALIDATION: CHECK what this is and if it is actually boolean
#F.avg("wards_placed").alias("avg_wards_placed"), # DATA VALIDATION: Need to check if duplicate of above
#F.avg("challenges.max_kill_deficit").alias("avg_max_kill_deficit"), # DATA VALIDATION: CHECK ? see if always zero values
#F.avg(F.col("challenges.quick_first_turret").cast("int") * 100).alias("pct_of_games_individual_took_1st_tower_quick"), # DATA VALIDATION: Boolean value, we don't know what quick means here in terms of time
#F.avg("challenges.turret_takedowns").alias("avg_individual_tower_takedowns2"), # DATA VALIDATION: Compare with above
#F.avg("challenges.void_monster_kill").alias("avg_individual_void_monster_kills"), # DATA VALIDATION: Void grubs? I have seen this have a valye of 7 when the champion did NOT have a rift herald takedown
#F.avg("challenges.epic_monster_steals").alias("avg_epic_monster_steals"), # DATA VALIDATION: Check if duplicate of the one above
#F.avg("challenges.game_length").alias("avg_game_length"), # DATA VALIDATION: Compare with above and delete one
#F.avg("challenges.killing_sprees").alias("avg_killing_sprees2"), # DATA VALIDATION: Check if equal and delete one
#F.avg("challenges.legendary_count").alias("avg_legendary_count"), # DATA VALIDATION: Not too sure what this is, my current guess is how many times champ was legendary (do they need to die and get leg again or it counts kills above leg?)
#F.avg("unreal_kills").alias("avg_unreal_kills"), # DATA VALIDATION: Might be a zero stat, need to check
#F.avg("challenges.mejais_full_stack_in_time").alias("avg_mejai_full_stack_time"), # DATA VALIDATION: is this a Boolean? Is this time?
#########################################################################################################################################


In [62]:
def data_validation(df_merged):

    # 1) Per match validation

    per_team_per_match_df = (
        df_merged
        .groupBy("match_id", "team_id")
        .agg(
            # Check how many times a team has no fastest legendary
            F.sum(F.when(F.col("challenges.fastest_legendary").isNotNull(), 1)
                    ).alias("number_of_fastest_legendary"),
            ((F.sum(F.when(F.col("challenges.fastest_legendary").isNotNull(), 1)
                   .otherwise(0)) != 1).cast("int")
                   ).alias("count_no_fastest_legendary"),
            # Check how many times a team has all NULLS for jungler_kills_early_jungle - extra check, not too important
            F.sum(F.when(F.col("challenges.jungler_kills_early_jungle").isNotNull(), 1)
                    ).alias("number_of_jungler_kills_early_jungle"),
            ((F.sum(F.when(F.col("challenges.jungler_kills_early_jungle").isNotNull(), 1)
                   .otherwise(0)) != 1).cast("int")
                   ).alias("count_missing_jungler_kills_early_jungle"),
        )
    )
    
    # 2) Global Validations
    # count how many distinct match_ids in the entire DataFrame:
    global_df = (
        df_merged.agg(
            F.countDistinct("match_id").alias("count_total_matches"),
            # Compare with number of matches to verify if it is one per match
            F.sum(F.col("challenges.early_laning_phase_gold_exp_advantage")).alias("sum_early_laning_phase_gold_exp_advantage"),
            F.sum(F.col("challenges.laning_phase_gold_exp_advantage")).alias("sum_laning_phase_gold_exp_advantage"),
            F.count(F.when(
                F.col("challenges.early_laning_phase_gold_exp_advantage") != F.col("challenges.laning_phase_gold_exp_advantage"), 1)
                ).alias("count_different_laning_phase_gold_exp_adv"),

            F.count(F.when(F.col("challenges.killed_champ_took_full_team_damage_survived") != 0, 1)
                    ).alias("non_zero_check_killed_champ_took_full_team_damage_survived"), # checking if all values are zero

            # Check if these ever has a value and if so if they are Boolean or a count
            F.count(F.when(F.col("challenges.jungler_kills_early_jungle") > 0, 1)
                    ).alias("count_non_zero_jungler_kills_early_jungle"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.jungler_kills_early_jungle") > 1, 1)
                    ).alias("count_non_bool_jungler_kills_early_jungle"), # Checking if this is a boolean or count of kills
            F.count(F.when(F.col("challenges.kills_on_laners_early_jungle_as_jungler") > 0, 1)
                    ).alias("count_non_zero_kills_on_laners_early_jungle_as_jungler"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.kills_on_laners_early_jungle_as_jungler") > 1, 1)
                    ).alias("count_non_bool_kills_on_laners_early_jungle_as_junglere"), # Checking if this is a boolean or count of kills
            F.count(F.when(F.col("challenges.get_takedowns_in_all_lanes_early_jungle_as_laner") > 0, 1)
                    ).alias("count_non_zero_get_takedowns_in_all_lanes_early_jungle_as_laner"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.get_takedowns_in_all_lanes_early_jungle_as_laner") > 1, 1)
                    ).alias("count_non_bool_get_takedowns_in_all_lanes_early_jungle_as_laner"), # Checking if this is a boolean or count of kills
            
            F.count(F.when(F.col("challenges.max_kill_deficit") > 0, 1)
                    ).alias("count_non_zero_max_kill_deficit"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.max_kill_deficit") > 1, 1)
                    ).alias("count_non_bool_max_kill_deficit"), # Checking if this is a boolean or count of kills
            
            F.count(F.when(F.col("challenges.epic_monster_stolen_without_smite") > 0, 1)
                    ).alias("count_non_zero_epic_monster_stolen_without_smite"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.epic_monster_stolen_without_smite") > 1, 1)
                    ).alias("count_non_bool_epic_monster_stolen_without_smite"), # Checking if this is a boolean or count of kills
            F.count(F.when(F.col("challenges.epic_monster_kills_near_enemy_jungler") > 0, 1)
                    ).alias("count_non_zero_epic_monster_kills_near_enemy_jungler"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.epic_monster_kills_near_enemy_jungler") > 1, 1)
                    ).alias("count_non_bool_epic_monster_kills_near_enemy_jungler"), # Checking if this is a boolean or count of kills

            F.count(F.when(F.col("unreal_kills") > 0, 1)
                    ).alias("count_non_zero_unreal_kills"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("unreal_kills") > 1, 1)
                    ).alias("count_non_bool_unreal_kills"), # Checking if this is a boolean or count of kills

            F.count(F.when(F.col("challenges.mejais_full_stack_in_time") > 0, 1)
                    ).alias("count_non_zero_mejais_full_stack_in_time"), # Checking if this is ever non null and non zero
            F.count(F.when(F.col("challenges.mejais_full_stack_in_time") > 1, 1)
                    ).alias("count_non_bool_mejais_full_stack_in_time"), # Checking if this is a boolean or count of kills
            # Extra check on mejais
            F.avg(F.when(F.col("challenges.mejais_full_stack_in_time") > 0, F.col("challenges.mejais_full_stack_in_time"))
                    ).alias("avg_non_zero_mejais_full_stack_in_time"), # Avg value if this is ever non null and non zero
            
            # Only bool check
            F.count(F.when(F.col("challenges.legendary_count") > 1, 1) # Can do a little extra digging
                    ).alias("count_non_bool_legendary_count"), # Checking if this is a boolean or count of kills

            # Check if duplicates
            F.count(F.when(F.col("wards_placed") != F.col("challenges.stealth_wards_placed"), 1)).alias("count_distinct_wards_placed"),
            F.count(F.when(F.col("challenges.turret_takedowns") != F.col("turret_takedowns"), 1)).alias("count_distinct_turret_takedowns"),
            F.count(F.when(F.col("challenges.epic_monster_steals") != F.col("objectives_stolen"), 1)).alias("count_distinct_epic_monster_steals"),
            F.count(F.when(F.col("challenges.game_length") != F.col("time_played"), 1)).alias("count_distinct_game_length"),
            F.count(F.when(F.col("challenges.killing_sprees") != F.col("killing_sprees"), 1)).alias("count_distinct_killing_sprees"),
        
        )
    )

    per_team_per_match_df.show()
    global_df.show()



In [10]:
def aggregate_champion_data(df_merged, all_item_tags, all_summoner_spells):
    window = Window.partitionBy("champion_id")
    MIN_PLAY_RATE = 5
    
    return (
        df_merged
        .withColumn("total_games_per_champion", F.count("*").over(window))

        .groupBy("champion_id", "champion_name","team_position")
        .agg(
            F.count("*").alias("total_games_played_in_role"),
            F.first("total_games_per_champion").alias("total_games_per_champion"), # Used for filtering minimum games in role
            
            # Core stats (KDA and number of games)
            F.avg("kills").alias("avg_kills"),
            F.avg("deaths").alias("avg_deaths"),
            F.avg("challenges.deaths_by_enemy_champs").alias("avg_deaths_by_enemy_champs"),
            F.avg("assists").alias("avg_assists"),
            F.avg("challenges.kill_participation").alias("avg_kill_participation"),
            F.avg("challenges.takedowns").alias("avg_takedowns"),
            F.sum(F.col("win").cast("int") * 100).alias("total_wins"), # Original column is boolean, need to cast to int before summing

            # Damage dealt stats
            F.sum("challenges.highest_champion_damage").alias("pct_of_games_with_highest_damage_dealt"), # Need to stay as sum and derive later due to NULLS
            F.avg("challenges.team_damage_percentage").alias("avg_pct_damage_dealt_in_team"),
            F.avg("challenges.damage_per_minute").alias("average_damage_per_minute"),
            F.avg("total_damage_dealt_to_champions").alias("avg_damage_dealt_to_champions"),
            F.avg("total_damage_dealt").alias("avg_total_damage_dealt"),
            F.avg("magic_damage_dealt_to_champions").alias("avg_magic_damage_dealt_to_champions"),
            F.avg("magic_damage_dealt").alias("avg_total_magic_damage_dealt"),
            F.avg("physical_damage_dealt_to_champions").alias("avg_physical_damage_dealt_to_champions"),
            F.avg("physical_damage_dealt").alias("avg_total_physical_damage_dealt"),
            F.avg("true_damage_dealt_to_champions").alias("avg_true_damage_dealt_to_champions"),
            F.avg("true_damage_dealt").alias("avg_total_true_damage_dealt"),
            F.avg("largest_critical_strike").alias("avg_largest_critical_strike"),

            # Damage Taken
            F.avg("challenges.damage_taken_on_team_percentage").alias("avg_pct_damage_taken_in_team"),
            F.avg("total_damage_taken").alias("avg_damage_taken"),
            F.avg("magic_damage_taken").alias("avg_magic_damage_taken"),
            F.avg("physical_damage_taken").alias("avg_physical_damage_taken"),
            F.avg("true_damage_taken").alias("avg_true_damage_taken"),
            F.avg("damage_self_mitigated").alias("avg_damage_self_mitigated"),
            # Situational Damage Taken (e.g. in teamfights)
            F.avg("challenges.killed_champ_took_full_team_damage_survived").alias("avg_times_killed_champ_took_full_team_damage_and_survived"), # DATA VALIDATION: check if always zero
            F.avg("challenges.survived_single_digit_hp_count").alias("avg_times_survived_single_digit_hp"),
            F.avg("challenges.survived_three_immobilizes_in_fight").alias("avg_times_survived_three_immobilizes_in_fight"),
            F.avg("challenges.took_large_damage_survived").alias("avg_times_took_large_damage_survived"),

            # Crowd Control         
            F.sum("challenges.highest_crowd_control_score").alias("pct_of_games_with_highest_crowd_control_score"), # Need to stay as sum and derive later due to NULLS
            F.avg("time_c_cing_others").alias("avg_time_ccing_others"), # Measure of time
            F.avg("total_time_cc_dealt").alias("avg_times_applied_cc_on_others"), # Number of times cc'd others
            F.avg("challenges.enemy_champion_immobilizations").alias("avg_enemy_champion_immobilizations"),

            # Healing and Shielding + Support
            F.avg("total_heal").alias("avg_total_healing"),
            F.avg("total_heals_on_teammates").alias("avg_heals_on_teammate"),
            F.avg("total_units_healed").alias("avg_total_units_healed"),
            F.avg('total_damage_shielded_on_teammates').alias("avg_dmg_shielded_on_team"),
            F.avg("challenges.effective_heal_and_shielding").alias("avg_effective_heal_and_shield"),
            F.sum(F.col("challenges.faster_support_quest_completion").cast("int")).alias("total_games_completed_supp_quest_first"), # Need to stay as sum and derive later due to NULLS
            F.avg("complete_support_quest_in_time").alias("avg_supp_quest_completion_time"),

            # Misc
            F.avg("longest_time_spent_living").alias("avg_longest_time_spent_alive"),
            F.avg("total_time_spent_dead").alias("avg_time_spent_dead"),

            # Spell Casts
            F.avg("spell1_casts").alias("avg_spell1_casts"),
            F.avg("spell2_casts").alias("avg_spell2_casts"),
            F.avg("spell3_casts").alias("avg_spell3_casts"),
            F.avg("spell4_casts").alias("avg_spell4_casts"),
            F.avg("challenges.ability_uses").alias("avg_ability_uses"),

            # Skillshot related (dodging and hitting)
            F.avg("challenges.dodge_skill_shots_small_window").alias("avg_times_dodged_skillshot_in_small_window"),
            F.avg("challenges.skillshots_dodged").alias("avg_skillshots_dodged"),
            F.avg("challenges.land_skill_shots_early_game").alias("avg_skillshots_landed_early_game"),
            F.avg("challenges.skillshots_hit").alias("avg_skillshots_hit"),

            # Picks
            F.avg("challenges.immobilize_and_kill_with_ally").alias("avg_times_immobilize_and_kill_with_ally"),
            F.avg("challenges.kill_after_hidden_with_ally").alias("avg_times_got_kill_after_hidden_with_ally"),
            F.avg("challenges.pick_kill_with_ally").alias("avg_times_pick_kill_with_ally"),
            F.avg("challenges.knock_enemy_into_team_and_kill").alias("avg_times_knock_enemy_into_team_and_kill"),

            # Kills under or near turret
            F.avg("challenges.kills_near_enemy_turret").alias("avg_kills_near_enemy_turret"),
            F.avg("challenges.kills_under_own_turret").alias("avg_kills_under_own_turret"),

            # Misc mechanics
            F.avg("challenges.multikills_after_aggressive_flash").alias("avg_multikills_after_aggressive_flash"),
            F.avg("challenges.outnumbered_kills").alias("avg_outnumbered_kills"),
            F.avg("challenges.outnumbered_nexus_kill").alias("avg_times_outnumbered_nexus_kill"),
            F.avg("challenges.quick_cleanse").alias("avg_times_quick_cleanse"),

            # Misc laning
                # Kills, takedowns and plays 
            F.avg("challenges.quick_solo_kills").alias("avg_quick_solo_kills"),
            F.avg("challenges.solo_kills").alias("avg_solo_kills"),
            F.avg("challenges.takedowns_after_gaining_level_advantage").alias("avg_takedowns_after_gaining_lvl_advantage"),
            F.avg("challenges.kills_on_other_lanes_early_jungle_as_laner").alias("avg_kills_on_other_lanes_early_as_jungler"),
            F.avg("challenges.save_ally_from_death").alias("avg_times_save_ally_from_death"),
            F.avg("challenges.takedowns_in_alcove").alias("avg_takedowns_in_alcove"),
                # First blood and early kills
            F.avg(F.col("first_blood_kill").cast("int") * 100).alias("pct_of_games_first_blood_kill"),
            F.avg(F.col("first_blood_assist").cast("int") * 100).alias("pct_of_games_first_blood_assist"),
            F.avg("challenges.takedowns_before_jungle_minion_spawn").alias("avg_takedowns_before_jungle_camps_spawn"),
            F.avg("challenges.takedowns_first_x_minutes").alias("avg_first_takedown_time"),

            # Summoner Spells
            F.avg("summoner1_casts").alias("avg_summoner_spell1_casts_per_game"),
            F.avg("summoner2_casts").alias("avg_summoner_spell2_casts_per_game"),
            *[(F.avg(f"has_{summoner_spell}") * 100).alias(f"pct_of_matches_with_{summoner_spell}") for summoner_spell in all_summoner_spells],

            # Experience and Gold
            F.avg("champ_experience").alias("avg_champ_exp_at_game_end"),
            F.avg("champ_level").alias("avg_champ_level_at_game_end"),
            F.avg("gold_earned").alias("avg_gold_earned_per_game"),
            F.avg("challenges.gold_per_minute").alias("avg_gold_per_minute"),
            F.avg("gold_spent").alias("avg_gold_spent"),
            F.avg("bounty_level").alias("avg_bounty_lvl"),
            F.avg("challenges.bounty_gold").alias("avg_bounty_gold"),
                # Laning Specific   
            F.avg(F.col("challenges.early_laning_phase_gold_exp_advantage") * 100).alias("pct_of_games_with_early_lane_phase_gold_exp_adv"), # DATA VALIDATION: Seems to only give "1" to a single player per match
            F.avg(F.col("challenges.laning_phase_gold_exp_advantage") * 100).alias("pct_of_games_with_lanephase_gold_exp_adv"), # DATA VALIDATION: Seems to only give "1" to a single player per match, also check if repeat of above
            F.avg("challenges.max_level_lead_lane_opponent").alias("avg_max_level_lead_over_lane_opp"),
                # Minions Specific
            F.avg("total_minions_killed").alias("avg_minions_killed"),
            F.avg("challenges.lane_minions_first10_minutes").alias("avg_minions_killed_by_10_mins"),
            F.avg("challenges.max_cs_advantage_on_lane_opponent").alias("avg_max_cs_lead_over_lane_opponent"),
                # Item Purchases
            F.avg("consumables_purchased").alias("avg_consumables_purchased"),
            F.avg("items_purchased").alias("avg_number_of_items_purchased"),
            F.sum(F.col("challenges.fastest_legendary").isNotNull().cast("int")).alias("total_games_fastest_item_completion"), # DATA VALIDATION: Maybe do NULLS for non supports? - likely earlier in code / also, seems to be 1 per team, check

            # Item Tags
            F.avg("number_of_items_completed").alias("avg_items_completed"),
            *[((F.avg(f"tag_[{item_tag}]_count") * 100) / F.col("avg_items_completed")).alias(f"pct_of_matches_with_{item_tag}") for item_tag in all_item_tags],

            # Jungle related stats
                # Jungle farm
            F.avg("total_ally_jungle_minions_killed").alias("avg_ally_jungle_minions_killed"), # Don'ty necessarily add up to total CS (might be counting a buff as 1 cs for example), use as standalone jungle farm distribution stat (ally jg vs invade, etc)
            F.avg("total_enemy_jungle_minions_killed").alias("avg_enemy_jungle_minions_killed"),
            F.avg("neutral_minions_killed").alias("avg_jungle_monsters_cs"), # Jungle monsters/farm - note that this shows for all players
            F.avg("challenges.buffs_stolen").alias("avg_buffs_stolen"),
            F.avg("challenges.initial_buff_count").alias("avg_initial_buff_count"), # Decided not to add NULLs to non junglers for now
            F.avg("challenges.epic_monster_kills_within30_seconds_of_spawn").alias("avg_epic_monster_kills_within_30s_of_spawn"),
            F.avg("challenges.initial_crab_count").alias("avg_initial_crab_count"), # Decided not to add NULLs to non junglers for now
            F.avg("challenges.scuttle_crab_kills").alias("avg_crabs_per_game"),
            F.avg("challenges.jungle_cs_before10_minutes").alias("avg_jg_cs_before_10m"), # Decided not to add NULLs to non junglers for now
                # Jungle Combat
            F.avg("challenges.jungler_kills_early_jungle").alias("avg_jungler_kills_early_jungle"), # DATA VALIDATION: what is this
            F.avg("challenges.kills_on_laners_early_jungle_as_jungler").alias("avg_jungler_early_kills_on_laners"), # DATA VALIDATION: what is this
            F.avg("challenges.get_takedowns_in_all_lanes_early_jungle_as_laner").alias("avg_times_had_early_takedowns_in_all_lanes_as_laner"), # DATA VALIDATION: CHECK what this is and if it is actually boolean
            F.avg("challenges.jungler_takedowns_near_damaged_epic_monster").alias("avg_jungler_takedowns_near_damaged_epic_monsters"),
            F.avg("challenges.kills_with_help_from_epic_monster").alias("avg_kills_with_help_from_epic_monster"),

            # Vision Stats
                # Vision score and wards placed + unseen recalls
            F.avg("vision_score").alias("avg_vision_score"),
            F.avg("challenges.vision_score_per_minute").alias("avg_vision_score_per_min"),
            F.avg("challenges.vision_score_advantage_lane_opponent").alias("avg_vision_score_advantage_over_lane_opponent"),
            F.avg("challenges.stealth_wards_placed").alias("avg_stealth_wards_placed"),
            F.avg("wards_placed").alias("avg_wards_placed"), # DATA VALIDATION: Need to check if duplicate of above
            F.avg("challenges.wards_guarded").alias("avg_wards_guarded"),
            F.avg("detector_wards_placed").alias("avg_control_wards_placed"), # Same as control wards
            F.avg("challenges.control_ward_time_coverage_in_river_or_enemy_half").alias("avg_control_ward_time_coverage_in_river_or_enemy_half"),
            F.avg("challenges.unseen_recalls").alias("avg_unseen_recalls"),
                # Wards killed
            F.sum(F.col("challenges.highest_ward_kills").cast("int")).alias("pct_of_games_with_highest_wards_killed"), # Need to stay as sum and derive later due to NULLS
            F.avg("wards_killed").alias("avg_wards_killed"),
            F.avg("challenges.ward_takedowns").alias("avg_ward_takedowns"),
            F.avg("challenges.ward_takedowns_before20_m").alias("avg_ward_takedowns_before_20m"),
            F.avg("challenges.two_wards_one_sweeper_count").alias("avg_times_2_wards_killed_with_1_sweeper"),
            F.avg("vision_wards_bought_in_game").alias("avg_control_wards_bought"),

            # Teamwide stats (mostly from df_teams with some from df_participants)
                # First objective rates
            F.avg(F.col("objectives.baron.first").cast("int") * 100).alias("pct_of_games_team_took_first_baron"),
            F.avg("challenges.earliest_baron").alias("avg_earliest_baron_by_team_time"),
            F.avg(F.col("objectives.dragon.first").cast("int") * 100).alias("pct_of_games_team_took_first_dragon"),
            F.avg(F.col("objectives.inhibitor.first").cast("int") * 100).alias("pct_of_games_team_took_first_inhib"),
            F.avg(F.col("objectives.rift_herald.first").cast("int") * 100).alias("pct_of_games_team_took_first_herald"),
            F.avg(F.col("objectives.tower.first").cast("int") * 100).alias("pct_of_games_team_took_first_turret"),
                # Team Objectives
            F.avg("objectives.baron.kills").alias("avg_baron_kills_by_team"),
            F.avg("objectives.rift_herald.kills").alias("avg_herald_kills_by_team"),
            F.avg("objectives.dragon.kills").alias("avg_dragon_kills_by_team"),
            F.avg(F.col("challenges.perfect_dragon_souls_taken") * 100).alias("pct_of_games_with_perfect_drag_soul_taken"),
            F.avg("challenges.team_elder_dragon_kills").alias("avg_elder_dragon_kills_by_team"),
            F.avg("challenges.elder_dragon_kills_with_opposing_soul").alias("avg_elder_dragon_kills_w_opposing_soul"),
                # Team Structures
            F.avg("objectives.inhibitor.kills").alias("avg_inhib_kills_by_team"),
            F.avg("objectives.tower.kills").alias("avg_tower_kills_by_team"),
            F.avg("inhibitors_lost").alias("avg_inhibs_lost_by_team"),
            F.avg(F.col("nexus_lost") * 100).alias("pct_of_games_with_nexus_lost_by_team"),
            F.avg("turrets_lost").alias("avg_turrets_lost_by_team"),
            F.avg(F.col("challenges.first_turret_killed").cast("int") * 100).alias("pct_of_games_first_turret_taken_by_team"),
            F.avg("challenges.first_turret_killed_time").alias("avg_first_turret_kill_time_by_team"),
                # Team Kills
            F.avg("objectives.champion.kills").alias("avg_total_team_champ_kills"),
            F.avg("challenges.aces_before15_minutes").alias("avg_team_aces_before_15_by_team"),
            F.avg("challenges.flawless_aces").alias("avg_flawless_aces_by_team"),
            F.avg("challenges.shortest_time_to_ace_from_first_takedown").alias("avg_shortest_time_to_ace_from_1st_takedown"),
            F.avg("challenges.max_kill_deficit").alias("avg_max_kill_deficit"), # DATA VALIDATION: CHECK ? see if always zero values
            F.avg(F.col("challenges.perfect_game").cast("int") * 100).alias("pct_of_games_that_are_perfect_games"),

            # Individual participant damage to structures
                # Damage dealt to structures
            F.avg("damage_dealt_to_buildings").alias("avg_indiv_dmg_dealt_to_buildings"),
            F.avg("damage_dealt_to_turrets").alias("avg_indiv_dmg_dealth_to_turrets"),
            F.avg("challenges.turret_plates_taken").alias("avg_indiv_turret_plates_taken"),
                # First tower
            F.avg(F.col("first_tower_kill").cast("int") * 100).alias("pct_of_games_indiv_killed_1st_tower"), # Boolean for champ who took first tower
            F.avg(F.col("challenges.takedown_on_first_turret").cast("int") * 100).alias("pct_of_games_individual_takedown_1st_tower"), # Boolean value, states whether participant had takedown on first turret
            F.avg(F.col("challenges.quick_first_turret").cast("int") * 100).alias("pct_of_games_individual_took_1st_tower_quick"), # DATA VALIDATION: Boolean value, we don't know what quick means here in terms of time
            F.avg(F.col("first_tower_assist").cast("int") * 100).alias("pct_of_games_individual_had_1st_turret_assist"),
                # Turrets kills/takedowns
            F.avg("challenges.k_turrets_destroyed_before_plates_fall").alias("avg_turrets_killed_before_plates_fell"),
            F.avg("turret_kills").alias("avg_individual_tower_kills"),
            F.avg("turret_takedowns").alias("avg_individual_tower_takedowns"),
            F.avg("challenges.turret_takedowns").alias("avg_individual_tower_takedowns2"), # DATA VALIDATION: Compare with above
            F.avg("challenges.solo_turrets_lategame").alias("avg_individual_solo_towers_kills_late_game"),
            F.avg("challenges.turrets_taken_with_rift_herald").alias("avg_indiv_towers_taken_w_rift_herald"),
            F.avg("challenges.multi_turret_rift_herald_count").alias("avg_indiv_multi_towers_taken_w_rift_herald"),
                # Inhibitor and nexus kills/takedowns + misc
            F.avg("inhibitor_kills").alias("avg_individual_inhibitor_kills"),
            F.avg("inhibitor_takedowns").alias("avg_individual_inhibitor_takedowns"),
            F.avg(F.col("nexus_kills") * 100).alias("pct_of_games_individual_killed_nexus"),
            F.avg(F.col("nexus_takedowns") * 100).alias("avg_individual_nexus_takedowns"),
            F.avg(F.col("challenges.had_open_nexus") * 100).alias("pct_of_games_with_open_nexus"),

            # Individual participant objectives
                # Objective kills/takedowns
            F.avg("damage_dealt_to_objectives").alias("avg_individual_dmg_dealt_to_objectives"),
            F.avg("baron_kills").alias("avg_individual_baron_kills"),
            F.avg("challenges.solo_baron_kills").alias("avg_individual_solo_baron_kills"),
            F.avg("challenges.baron_takedowns").alias("avg_individual_baron_takedowns"),
            F.avg("dragon_kills").alias("avg_individual_dragon_kills"),
            F.avg("challenges.dragon_takedowns").alias("avg_individual_dragon_takedowns"),
            F.avg("challenges.rift_herald_takedowns").alias("avg_individual_rift_herald_takedowns"),
            F.avg("challenges.void_monster_kill").alias("avg_individual_void_monster_kills"), # DATA VALIDATION: Void grubs? I have seen this have a valye of 7 when the champion did NOT have a rift herald takedown
                # Objective steals
            F.avg("objectives_stolen").alias("avg_objectives_stolen"),
            F.avg("objectives_stolen_assists").alias("avg_objectives_stolen_assists"),
            F.avg("challenges.epic_monster_steals").alias("avg_epic_monster_steals"), # DATA VALIDATION: Check if duplicate of the one above
            F.avg("challenges.epic_monster_stolen_without_smite").alias("avg_epic_monster_steals_without_smite"),
            F.avg("challenges.epic_monster_kills_near_enemy_jungler").alias("avg_epic_monsters_killed_near_enemy_jgler"),
                # Earliest dragon takedown stats (used for derived stats)
            F.avg("challenges.earliest_dragon_takedown").alias("avg_earliest_drag_takedown"),
            F.avg(F.col("had_dragon_takedown") * 100).alias("pct_of_games_had_drag_takedown"),
            F.avg(F.col("first_drag_takedown_min_5_to_7") * 100).alias("pct_of_games_had_drag_takedown_min_5_to_7"), 
            F.avg(F.col("first_drag_takedown_min_7_to_11") * 100).alias("pct_of_games_had_drag_takedown_min_7_to_11"),
            F.avg(F.col("first_drag_takedown_min_11_to_15") * 100).alias("pct_of_games_had_drag_takedown_min_11_to_15"),
            F.avg(F.col("first_drag_takedown_min_15+") * 100).alias("pct_of_games_had_drag_takedown_min_15_plus"),

            # Game length related
            (F.avg("time_played") / 60).alias("avg_time_played_per_game_minutes"),
            F.avg("challenges.game_length").alias("avg_game_length"), # DATA VALIDATION: Compare with above and delete one
            F.avg(F.col("game_ended_in_early_surrender").cast("int") * 100).alias("pct_of_games_ended_in_early_ff"),
            F.avg(F.col("game_ended_in_surrender").cast("int") * 100).alias("pct_of_games_ended_in_ff"),
            F.avg(F.col("team_early_surrendered").cast("int") * 100).alias("pct_of_games_team_ffd"),

            # Multikill and killing spree stats
                # Multikills
            F.avg("double_kills").alias("avg_doublekills"),
            F.avg("triple_kills").alias("avg_triplekills"),
            F.avg("quadra_kills").alias("avg_quadrakills"),
            F.avg("penta_kills").alias("avg_pentakills"),
            F.avg("largest_multi_kill").alias("avg_largest_multikill"),
            F.avg("challenges.multikills").alias("avg_number_of_multikills"),
            F.avg("challenges.multi_kill_one_spell").alias("avg_multikills_with_one_spell"),
                # Killing sprees stats
            F.avg("killing_sprees").alias("avg_killing_sprees"),
            F.avg("challenges.killing_sprees").alias("avg_killing_sprees2"), # DATA VALIDATION: Check if equal and delete one
            F.avg("challenges.legendary_count").alias("avg_legendary_count"), # DATA VALIDATION: Not too sure what this is, my current guess is how many times champ was legendary (do they need to die and get leg again or it counts kills above leg?)
            F.avg("largest_killing_spree").alias("avg_largest_killing_spee"),
                # Misc
            F.avg("unreal_kills").alias("avg_unreal_kills"), # DATA VALIDATION: Might be a zero stat, need to check
            F.avg("challenges.12_assist_streak_count").alias("avg_12_assist_streaks"),
            F.avg("challenges.elder_dragon_multikills").alias("avg_elder_drag_multikills"),
            F.avg("challenges.full_team_takedown").alias("avg_full_team_takedowns"),

            # Items - decide whether to use data from match or from timeline and make sure to indluce item tags in df 

            # Misc
            F.avg("challenges.blast_cone_opposite_opponent_count").alias("avg_times_blast_cone_enemy"),
            F.avg(F.col("challenges.danced_with_rift_herald") * 100).alias("pct_of_games_danced_with_rift_herald"),
            F.avg("challenges.double_aces").alias("avg_double_aces"),
            F.avg("challenges.fist_bump_participation").alias("avg_fist_bump_participations"),
            F.avg("challenges.mejais_full_stack_in_time").alias("avg_mejai_full_stack_time"), # DATA VALIDATION: is this a Boolean? Is this time?
            F.avg("challenges.outer_turret_executes_before10_minutes").alias("avg_outer_turret_executes_before_10m"),
            F.avg("challenges.takedowns_in_enemy_fountain").alias("avg_takedowns_in_enemy_fountain"),

            # Position/Role - find aggregation key word for most common string -> DATA VALIDATION
            F.mode("individual_position").alias("mode_individual_position"), # Best guess for which position the player actually played in isolation of anything else
            F.mode("lane").alias("mode_lane"), # Gives slightly different string than above, might have something to do with where champ spent most of the time
            F.mode("role").alias("mode_role"),
            F.mode("team_position").alias("mode_team_position"), # The teamPosition is the best guess for which position the player actually played if we add the constraint that each team must have one top player, one jungle, one middle, etc
            F.sum(F.col("challenges.played_champ_select_position").cast("int")).alias("pct_of_games_played_champ_select_position") # only present if no roleswap - seems useless
        )
        .withColumn(
            "role_play_rate",
            (F.col("total_games_played_in_role") * 100 / F.col("total_games_per_champion"))
        )
        # Converting "sum" columns into percentages (couldn't do it in one step due to NULLS)
        .withColumn(
            "pct_of_games_with_highest_damage_dealt",
            (F.col("pct_of_games_with_highest_damage_dealt") * 100 / F.col("total_games_played_in_role"))
        )
        .withColumn(
            "pct_of_games_with_highest_crowd_control_score",
            (F.col("pct_of_games_with_highest_crowd_control_score") * 100 / F.col("total_games_played_in_role"))
        )
        .withColumn(
            "total_games_completed_supp_quest_first",
            (F.col("total_games_completed_supp_quest_first")* 100 / F.col("total_games_played_in_role"))
        )
        .withColumn(
            "pct_of_games_with_highest_wards_killed",
            (F.col("pct_of_games_with_highest_wards_killed")* 100 / F.col("total_games_played_in_role"))
        )
        # Core derived stats
        .withColumn(
            "kda",
            ((F.col("avg_kills") + F.col("avg_assists")) / F.col("avg_deaths"))
        )
        .withColumn(
            "win_rate",
            (F.col("total_wins") * 100 / F.col("total_games_played_in_role"))
        )
        .withColumn(
            "avg_cs",
            (F.col("avg_minions_killed") + F.col("avg_jungle_monsters_cs"))
        )
        .withColumn(
            "avg_cs_per_minute",
            (F.col("avg_cs") / F.col("avg_time_played_per_game_minutes"))
        )
        # Misc?
        .withColumn(
            "pct_games_first_to_complete_item",
            (F.col("total_games_fastest_item_completion") * 100 / F.col("total_games_played_in_role"))
        )
        .filter(F.col("role_play_rate") >= MIN_PLAY_RATE)
    )

In [11]:
"""
Main aggregating functions, uses helpers as needed, idea is to pull all wanted keys from match_data struct into columns (some keys will be modified, derived)
Finally, match_data will be dropped
"""
def main(
    spark: SparkSession,
    csv_file_path: str,
) -> DataFrame:
    df_participants, df_teams = create_matches_df(spark, csv_file_path)
    
    # Create an indicator column for games where champion had a dragon takedown, and subsequent columns with the timing of first dragon takedown
    df_participants = derive_participant_dragon_stats(df_participants)

    df_participants, all_item_tags, all_summoner_spells = map_tags_and_summoner_spells_to_df(df_participants, get_items_dict(), SUMM_SPELLS_DICT, spark)

    df_participants = filter_position_specific_metrics(df_participants)

    df_merged = df_participants.join(
        df_teams.drop("win"), # Remove column that appears in both DataFrames
        on = ["match_id", "team_id"],
        how = "left"
    )

    df_grouped = aggregate_champion_data(df_merged, all_item_tags, all_summoner_spells)

    df_grouped.show(10)

    return df_grouped

In [12]:
"""TESTING ABOVE"""
#df_participants, df_teams = create_match_df(spark, SMALL_SNAKE_CSV)
main(spark, LARGE_SNAKE_CSV)

###################

+-----------+-------------+-------------+--------------------------+------------------------+------------------+------------------+--------------------------+------------------+----------------------+------------------+----------+--------------------------------------+----------------------------+-------------------------+-----------------------------+----------------------+-----------------------------------+----------------------------+--------------------------------------+-------------------------------+----------------------------------+---------------------------+---------------------------+----------------------------+------------------+----------------------+-------------------------+---------------------+-------------------------+---------------------------------------------------------+----------------------------------+---------------------------------------------+------------------------------------+---------------------------------------------+---------------------+-------

DataFrame[champion_id: bigint, champion_name: string, team_position: string, total_games_played_in_role: bigint, total_games_per_champion: bigint, avg_kills: double, avg_deaths: double, avg_deaths_by_enemy_champs: double, avg_assists: double, avg_kill_participation: double, avg_takedowns: double, total_wins: bigint, pct_of_games_with_highest_damage_dealt: double, avg_pct_damage_dealt_in_team: double, average_damage_per_minute: double, avg_damage_dealt_to_champions: double, avg_total_damage_dealt: double, avg_magic_damage_dealt_to_champions: double, avg_total_magic_damage_dealt: double, avg_physical_damage_dealt_to_champions: double, avg_total_physical_damage_dealt: double, avg_true_damage_dealt_to_champions: double, avg_total_true_damage_dealt: double, avg_largest_critical_strike: double, avg_pct_damage_taken_in_team: double, avg_damage_taken: double, avg_magic_damage_taken: double, avg_physical_damage_taken: double, avg_true_damage_taken: double, avg_damage_self_mitigated: double, avg

In [None]:
"""Data Validation"""

def compile_and_validate_data(
    spark: SparkSession,
    csv_file_path: str,
) -> DataFrame:
    df_participants, df_teams = create_matches_df(spark, csv_file_path)
    
    # Create an indicator column for games where champion had a dragon takedown, and subsequent columns with the timing of first dragon takedown
    df_participants = derive_participant_dragon_stats(df_participants)

    df_participants, all_item_tags, all_summoner_spells = map_tags_and_summoner_spells_to_df(df_participants, get_items_dict(), SUMM_SPELLS_DICT, spark)

    df_participants = filter_position_specific_metrics(df_participants)

    df_merged = df_participants.join(
        df_teams.drop("win"), # Remove column that appears in both DataFrames
        on = ["match_id", "team_id"],
        how = "left"
    )

    data_validation(df_merged)

In [21]:
"""TESTING ABOVE"""
compile_and_validate_data(spark, LARGE_SNAKE_CSV)

+--------------+-------+---------------------------+--------------------------+------------------------------------+----------------------------------------+
|      match_id|team_id|number_of_fastest_legendary|count_no_fastest_legendary|number_of_jungler_kills_early_jungle|count_missing_jungler_kills_early_jungle|
+--------------+-------+---------------------------+--------------------------+------------------------------------+----------------------------------------+
|NA1_5250177985|    100|                       NULL|                         1|                                   1|                                       0|
|NA1_5250181690|    200|                       NULL|                         1|                                   1|                                       0|
|NA1_5250187534|    100|                          1|                         0|                                   1|                                       0|
|NA1_5250217052|    100|                       NULL|

Spark stop final line

In [24]:
df = spark.range(10)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [21]:
spark.stop()
spark.catalog.clearCache()

565