# Basic CSGO Analysis
#### *Last Updated: August 25, 2021*
The csgo package was developed with easy analysis in mind. To that end, the data parsed goes directly into Pandas DataFrames, as shown in the first example notebook, [Parsing a CSGO demofile](https://github.com/pnxenopoulos/csgo/blob/master/examples/00_Parsing_a_CSGO_Demofile.ipynb). To efficiently calculate aggregate statistics from these Pandas Dataframes, the package contains `calc_stats()`, which filters, groups, and aggregates data based on user input. Furthermore, the package contains thirteen functions derived from `calc_stats()` to calculate standard CSGO aggregate statistics. 

To start, we reference the [demofile](https://www.hltv.org/matches/2349180/gambit-vs-natus-vincere-blast-premier-spring-final-2021) for a match between Astralis and Team Liquid, where we look at the second map of the series, `nuke`.

In [10]:
import operator
from typing import Dict, List, Tuple, Union

import pandas as pd

from csgo.parser import DemoParser

# Create the parser object.
# Set log=True above if you want to produce a logfile for the parser.
demo_parser = DemoParser(demofile = 'demo_files/5346930_5102434.dem', demo_id = 'first_parsed', parse_rate = 128)

# Parse the demofile, output results to a dictionary and a dataframe.
data = demo_parser.parse()
data_df = demo_parser.parse(return_type="df")

15:00:22 [INFO] Go version>=1.14.0
15:00:22 [INFO] Initialized CSGODemoParser with demofile C:\Users\Maksim Komatovskiy\Desktop\clean_pars\demo_files\5346930_5102434.dem
15:00:22 [INFO] Setting demo id to first_parsed
15:00:22 [INFO] Setting parse rate to 128
15:00:22 [INFO] Running Golang parser from C:\Users\Maksim Komatovskiy\Desktop\clean_pars\csgo\parser\
15:00:22 [INFO] Looking for file at C:\Users\Maksim Komatovskiy\Desktop\clean_pars\demo_files\5346930_5102434.dem
15:01:05 [INFO] Wrote demo parse output to first_parsed.json
15:01:05 [INFO] Reading in JSON from first_parsed.json
15:01:05 [INFO] JSON data loaded, available in the `json` attribute to parser
15:01:05 [INFO] Successfully parsed JSON output
15:01:05 [INFO] Successfully returned JSON output
15:01:05 [INFO] Running Golang parser from C:\Users\Maksim Komatovskiy\Desktop\clean_pars\csgo\parser\
15:01:05 [INFO] Looking for file at C:\Users\Maksim Komatovskiy\Desktop\clean_pars\demo_files\5346930_5102434.dem
15:01:47 [INFO

## `calc_stats()` 
`calc_stats()` can be used to calculate aggregate statistics from any of the Pandas DataFrames containing event data. It also allows the user to pass column filters. For example, we can use the function to calculate each player's headshot kills in the first half.

In [11]:
# Helper functions for calc_stats()
def extract_num_filters(
    filters: Dict[str, Union[List[bool], List[str]]], key: str
) -> Tuple[List[str], List[float]]:
    sign_list = []
    val_list = []
    for index in filters[key]:
        if not isinstance(index, str):
            raise ValueError(
                f'Filter(s) for column "{key}" must be of type ' f"string."
            )
        i = 0
        sign = ""
        while i < len(index) and not index[i].isdecimal():
            sign += index[i]
            end_index = i
            i += 1
        if sign not in ("==", "!=", "<=", ">=", "<", ">"):
            raise Exception(
                f'Invalid logical operator in filters for "{key}"' f" column."
            )
        sign_list.append(sign)
        try:
            val_list.append(float(index[end_index + 1 :]))
        except ValueError as ve:
            raise Exception(
                f'Invalid numerical value in filters for "{key}" ' f"column."
            ) from ve
    return sign_list, val_list


def check_filters(df: pd.DataFrame, filters: Dict[str, Union[List[bool], List[str]]]):
    for key in filters:
        if df.dtypes[key] == "bool":
            for index in filters[key]:
                if not isinstance(index, bool):
                    raise ValueError(
                        f'Filter(s) for column "{key}" must be ' f"of type boolean"
                    )
        elif df.dtypes[key] == "O":
            for index in filters[key]:
                if not isinstance(index, str):
                    raise ValueError(
                        f'Filter(s) for column "{key}" must be ' f"of type string"
                    )
        else:
            extract_num_filters(filters, key)


def num_filter_df(df: pd.DataFrame, col: str, sign: str, val: float) -> pd.DataFrame:
    ops = {
        "==": operator.eq(df[col], val),
        "!=": operator.ne(df[col], val),
        "<=": operator.le(df[col], val),
        ">=": operator.ge(df[col], val),
        "<": operator.lt(df[col], val),
        ">": operator.gt(df[col], val),
    }
    filtered_df = df.loc[ops[sign]]
    return filtered_df


def filter_df(
    df: pd.DataFrame, filters: Dict[str, Union[List[bool], List[str]]]
) -> pd.DataFrame:
    df_copy = df.copy()
    check_filters(df_copy, filters)
    for key in filters:
        if df_copy.dtypes[key] == "bool" or df_copy.dtypes[key] == "O":
            df_copy = df_copy.loc[df_copy[key].isin(filters[key])]
        else:
            i = 0
            for sign in extract_num_filters(filters, key)[0]:
                val = extract_num_filters(filters, key)[1][i]
                df_copy = num_filter_df(
                    df_copy, key, extract_num_filters(filters, key)[0][i], val
                )
                i += 1
    return df_copy

In [12]:
def calc_stats(
    df: pd.DataFrame,
    filters: Dict[str, Union[List[bool], List[str]]],
    col_to_groupby: List[str],
    col_to_agg: List[str],
    agg: List[List[str]],
    col_names: List[str],
) -> pd.DataFrame:
    df_copy = filter_df(df, filters)
    agg_dict = dict(zip(col_to_agg, agg))
    if col_to_agg:
        df_copy = df_copy.groupby(col_to_groupby).agg(agg_dict).reset_index()
    df_copy.columns = col_names
    return df_copy

Below, the data is set to the `Kills` DataFrame, the data is filtered to where the value of the column `IsHeadshot` is True and the value of the column `RoundNum` is less than 16, the data is grouped by `AttackerName`, the column `AttackerName` is aggregated, the aggregation function `size()` is used, and the columns are renamed to `Player` and `1st Half HS`.

In [13]:
calc_stats(data_df["Kills"], {"IsHeadshot":[True], "RoundNum":["<16"]},
           ["AttackerName"], ["AttackerName"], [["size"]], 
           ["Player", "1st Half HS"])

Unnamed: 0,Player,1st Half HS
0,.Yagami Layt,4
1,Perfect Nikita,4
2,Pix_Official,4
3,_Troin,3
4,ayetatari228133,3
5,cart111,2
6,cyxap1k,3
7,lacost3r,9
8,tr4p1chh,7


As mentioned earlier, the package contains thirteen functions derived from `calc_stats()` to efficiently calculate popular CSGO aggregate statistics. Unlike `calc_stats()`, the columns to group and aggregate the data by, the aggregation functions, and the column names do not need to be passed to these functions, only the data and column filters need to be passed.

# `adr()`
`adr()` takes in damage data, round data, a boolean specifying whether to calculate statistics for each player or for each team, and filters for each group of data, and returns a DataFrame with normalized and raw ADR.

In [24]:
def adr(
    damage_data: pd.DataFrame,
    round_data: pd.DataFrame,
    team: bool = False,
    damage_filters: Dict[str, Union[List[bool], List[str]]] = {},
    round_filters: Dict[str, Union[List[bool], List[str]]] = {},
) -> pd.DataFrame:
    stats = ["AttackerName", "Player"]
    if team:
        stats = ["AttackerTeam", "Team"]
    adr = calc_stats(
        damage_data.loc[damage_data["AttackerTeam"] != damage_data["VictimTeam"]],
        damage_filters,
        [stats[0]],
        ["HpDamageTaken", "HpDamage"],
        [["sum"], ["sum"]],
        [stats[1], "Norm ADR", "Raw ADR"],
    )
    adr["Norm ADR"] = adr["Norm ADR"] / len(
        calc_stats(round_data, round_filters, [], [], [], round_data.columns)
    )
    adr["Raw ADR"] = adr["Raw ADR"] / len(
        calc_stats(round_data, round_filters, [], [], [], round_data.columns)
    )
    adr.sort_values(by="Norm ADR", ascending=False, inplace=True)
    adr.reset_index(drop=True, inplace=True)
    return adr

adr(data_df["Damages"], data_df["Rounds"])

15:03:56 [INFO] NumExpr defaulting to 8 threads.


Unnamed: 0,Player,Norm ADR,Raw ADR
0,Pix_Official,94.829268,143.634146
1,cyxap1k,92.073171,144.512195
2,Perfect Nikita,91.463415,116.560976
3,tr4p1chh,89.926829,120.02439
4,_Troin,88.585366,112.609756
5,.Yagami Layt,78.463415,136.536585
6,lacost3r,66.634146,86.04878
7,ayetatari228133,66.487805,91.609756
8,4umakoff,59.536585,71.707317
9,cart111,54.439024,96.658537


# `flash_stats()`
`flash_stats()` takes in flash data, grenade data, kill data, a boolean specifying whether to calculate statistics for each player or for each team, and filters for each group of data, and returns a DataFrame with enemy flashes, flash assists, enemy blind time, team flashes, flashes thrown, enemy flashes per throw, and enemy blind time per enemy.

# `bomb_stats()`
`bomb_stats()` takes in bomb data and bomb data filters, and returns a DataFrame with bomb plants, defuses, and defuse percentage, by side and bombsite.

In [27]:
def bomb_stats(
    bomb_data: pd.DataFrame, bomb_filters: Dict[str, Union[List[bool], List[str]]] = {},
) -> pd.DataFrame:
    team_one = bomb_data["PlayerTeam"].unique()[0]
    team_two = bomb_data["PlayerTeam"].unique()[1]
    team_one_plants = calc_stats(
        bomb_data.loc[
            (bomb_data["BombAction"] == "plant") & (bomb_data["PlayerTeam"] == team_one)
        ],
        bomb_filters,
        ["BombSite"],
        ["BombSite"],
        [["size"]],
        ["Bombsite", f"{team_one} Plants"],
    )
    team_two_plants = calc_stats(
        bomb_data.loc[
            (bomb_data["BombAction"] == "plant") & (bomb_data["PlayerTeam"] == team_two)
        ],
        bomb_filters,
        ["BombSite"],
        ["BombSite"],
        [["size"]],
        ["Bombsite", f"{team_two} Plants"],
    )
    team_one_defuses = calc_stats(
        bomb_data.loc[
            (bomb_data["BombAction"] == "defuse")
            & (bomb_data["PlayerTeam"] == team_one)
        ],
        bomb_filters,
        ["BombSite"],
        ["BombSite"],
        [["size"]],
        ["Bombsite", f"{team_one} Defuses"],
    )
    team_two_defuses = calc_stats(
        bomb_data.loc[
            (bomb_data["BombAction"] == "defuse")
            & (bomb_data["PlayerTeam"] == team_two)
        ],
        bomb_filters,
        ["BombSite"],
        ["BombSite"],
        [["size"]],
        ["Bombsite", f"{team_two} Defuses"],
    )
    bomb_stats = team_one_plants.merge(team_two_defuses, how="outer").fillna(0)
    bomb_stats[f"{team_two} Defuse %"] = (
        bomb_stats[f"{team_two} Defuses"] / bomb_stats[f"{team_one} Plants"]
    )
    bomb_stats = bomb_stats.merge(team_two_plants, how="outer").fillna(0)
    bomb_stats = bomb_stats.merge(team_one_defuses, how="outer").fillna(0)
    bomb_stats[f"{team_one} Defuse %"] = (
        bomb_stats[f"{team_one} Defuses"] / bomb_stats[f"{team_two} Plants"]
    )
    bomb_stats.loc[2] = [
        "A and B",
        bomb_stats[f"{team_one} Plants"].sum(),
        bomb_stats[f"{team_two} Defuses"].sum(),
        (
            bomb_stats[f"{team_two} Defuses"].sum()
            / bomb_stats[f"{team_one} Plants"].sum()
        ),
        bomb_stats[f"{team_two} Plants"].sum(),
        bomb_stats[f"{team_one} Defuses"].sum(),
        (
            bomb_stats[f"{team_one} Defuses"].sum()
            / bomb_stats[f"{team_two} Plants"].sum()
        ),
    ]
    bomb_stats.fillna(0, inplace=True)
    bomb_stats.iloc[:, [1, 2, 4, 5]] = bomb_stats.iloc[:, [1, 2, 4, 5]].astype(int)
    return bomb_stats

bomb_stats(data_df["BombEvents"])

Unnamed: 0,Bombsite,Team MINT Plants,TeamCocks Defuses,TeamCocks Defuse %,TeamCocks Plants,Team MINT Defuses,Team MINT Defuse %
0,A,8,0,0.0,4,0,0.0
1,B,5,2,0.4,8,4,0.5
2,A and B,13,2,0.153846,12,4,0.333333


# `kill_breakdown()`
`kill_breakdown()` takes in kill data, a boolean specifying whether to calculate statistics for each player or for each team, and kill data filters, and returns a DataFrame with kills by weapon type.

In [29]:
# Helper function for kill_breakdown()
def weapon_type(weapon: str) -> str:
    if weapon in ["Knife"]:
        return "Melee Kills"
    elif weapon in [
        "CZ-75 Auto",
        "Desert Eagle",
        "Dual Berettas",
        "Five-SeveN",
        "Glock-18",
        "P2000",
        "P250",
        "R8 Revolver",
        "Tec-9",
        "USP-S",
    ]:
        return "Pistol Kills"
    elif weapon in ["MAG-7", "Nova", "Sawed-Off", "XM1014"]:
        return "Shotgun Kills"
    elif weapon in ["MAC-10", "MP5-SD", "MP7", "MP9", "P90", "PP-Bizon", "UMP-45"]:
        return "SMG Kills"
    elif weapon in ["AK-47", "AUG", "FAMAS", "Galil AR", "M4A1-S", "M4A4", "SG 553"]:
        return "Assault Rifle Kills"
    elif weapon in ["M249", "Negev"]:
        return "Machine Gun Kills"
    elif weapon in ["AWP", "G3SG1", "SCAR-20", "SSG 08"]:
        return "Sniper Rifle Kills"
    else:
        return "Utility Kills"


def kill_breakdown(
    kill_data: pd.DataFrame,
    team: bool = False,
    kill_filters: Dict[str, Union[List[bool], List[str]]] = {},
) -> pd.DataFrame:
    stats = ["AttackerName", "Player"]
    if team:
        stats = ["AttackerTeam", "Team"]
    kill_breakdown = kill_data.loc[
        kill_data["AttackerTeam"] != kill_data["VictimTeam"]
    ].copy()
    kill_breakdown["Kills Type"] = kill_breakdown.apply(
        lambda row: weapon_type(row["Weapon"]), axis=1
    )
    kill_breakdown = calc_stats(
        kill_breakdown,
        kill_filters,
        [stats[0], "Kills Type"],
        [stats[0]],
        [["size"]],
        [stats[1], "Kills Type", "Kills"],
    )
    kill_breakdown = kill_breakdown.pivot(
        index=stats[1], columns="Kills Type", values="Kills"
    )
    for col in [
        "Melee Kills",
        "Pistol Kills",
        "Shotgun Kills",
        "SMG Kills",
        "Assault Rifle Kills",
        "Machine Gun Kills",
        "Sniper Rifle Kills",
        "Utility Kills",
    ]:
        if not col in kill_breakdown.columns:
            kill_breakdown.insert(0, col, 0)
        kill_breakdown[col].fillna(0, inplace=True)
        kill_breakdown[col] = kill_breakdown[col].astype(int)
    kill_breakdown["Total Kills"] = kill_breakdown.iloc[0:].sum(axis=1)
    kill_breakdown.reset_index(inplace=True)
    kill_breakdown.rename_axis(None, axis=1, inplace=True)
    kill_breakdown = kill_breakdown[
        [
            stats[1],
            "Melee Kills",
            "Pistol Kills",
            "Shotgun Kills",
            "SMG Kills",
            "Assault Rifle Kills",
            "Machine Gun Kills",
            "Sniper Rifle Kills",
            "Utility Kills",
            "Total Kills",
        ]
    ]
    kill_breakdown.sort_values(by="Total Kills", ascending=False, inplace=True)
    kill_breakdown.reset_index(drop=True, inplace=True)
    return kill_breakdown

kill_breakdown(data_df["Kills"])

Unnamed: 0,Player,Melee Kills,Pistol Kills,Shotgun Kills,SMG Kills,Assault Rifle Kills,Machine Gun Kills,Sniper Rifle Kills,Utility Kills,Total Kills
0,tr4p1chh,1,3,0,1,33,0,0,1,39
1,_Troin,0,8,0,0,28,0,0,2,38
2,cyxap1k,0,4,0,2,17,0,12,1,36
3,Perfect Nikita,0,3,0,0,22,0,9,1,35
4,Pix_Official,0,0,0,1,18,0,10,6,35
5,ayetatari228133,0,2,0,1,14,0,8,1,26
6,lacost3r,0,7,0,1,16,0,2,0,26
7,.Yagami Layt,0,3,0,0,3,0,17,1,24
8,4umakoff,0,2,0,0,17,0,0,0,19
9,cart111,0,0,0,0,4,0,13,0,17


# `win_breakdown()`
`win_breakdown()` takes in round data and round data filters, and returns a DataFrame with win type by team.

In [34]:
def win_breakdown(
    round_data: pd.DataFrame,
    round_filters: Dict[str, Union[List[bool], List[str]]] = {},
) -> pd.DataFrame:
    round_data_copy = round_data.copy()
    round_data_copy.replace("BombDefused", "CT Bomb Defusal Wins", inplace=True)
    round_data_copy.replace("CTWin", "CT T Elim Wins", inplace=True)
    round_data_copy.replace("TargetBombed", "T Bomb Detonation Wins", inplace=True)
    round_data_copy.replace("TargetSaved", "CT Time Expired Wins", inplace=True)
    round_data_copy.replace("TerroristsWin", "T CT Elim Wins", inplace=True)
    win_breakdown = calc_stats(
        round_data_copy,
        round_filters,
        ["WinningTeam", "RoundEndReason"],
        ["RoundEndReason"],
        [["size"]],
        ["Team", "RoundEndReason", "Count"],
    )
    win_breakdown = win_breakdown.pivot(
        index="Team", columns="RoundEndReason", values="Count"
    ).fillna(0)
    win_breakdown.reset_index(inplace=True)
    win_breakdown.rename_axis(None, axis=1, inplace=True)
    win_breakdown["Total CT Wins"] = win_breakdown.iloc[0:][
        list(
            set.intersection(
                set(win_breakdown.columns),
                set(["CT Bomb Defusal Wins", "CT T Elim Wins", "CT Time Expired Wins"]),
            )
        )
    ].sum(axis=1)
    win_breakdown["Total T Wins"] = win_breakdown.iloc[0:][
        list(
            set.intersection(
                set(win_breakdown.columns),
                set(["T Bomb Detonation Wins", "T CT Elim Wins"]),
            )
        )
    ].sum(axis=1)
    win_breakdown["Total Wins"] = win_breakdown.iloc[0:, 0:-2].sum(axis=1)
    win_breakdown.iloc[:, 1:] = win_breakdown.iloc[:, 1:].astype(int)
    return win_breakdown

win_breakdown(data_df["Rounds"])

Unnamed: 0,Team,CT Bomb Defusal Wins,CT T Elim Wins,CT Time Expired Wins,T Bomb Detonation Wins,T CT Elim Wins,Total CT Wins,Total T Wins,Total Wins
0,Team MINT,4,7,0,3,8,11,11,22
1,TeamCocks,2,6,2,2,7,10,9,19
