In [1]:
from awpy import DemoParser
from awpy.analytics.states import generate_vector_state
from tqdm.notebook import tqdm
import pandas as pd
import numpy as np
import patoolib
import os
import shutil
from sqlalchemy import create_engine
import sys
import logging
import warnings
warnings.filterwarnings("ignore")
pd.options.display.max_columns = None
os.environ['NUMEXPR_MAX_THREADS'] = '12'

In [2]:
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"), stream=sys.stdout, 
                    format='%(asctime)s - %(name)s - [%(levelname)s]: %(message)s')
log = logging.getLogger(__name__)

In [3]:
class parser:
    def __init__(self, dir_demo = r'F:\hltv', dir_store = r'F:\csgoanalysis', dir_temp = r'csgo_tmp', parse_rate = 32, 
                 trade_time = 5, buy_style = "hltv"):
        self.dir_demo = dir_demo
        self.dir_store = dir_store
        self.dir_temp = dir_temp
        self.parse_rate = parse_rate
        self.trade_time = trade_time
        self.buy_style = buy_style
    
    def parse_all(self):
        log.info(f"Parsing all files in directory: \"{self.dir_demo}\"")
        for file in tqdm(os.listdir(self.dir_demo), desc="Match", leave=True, position=0):
            self.parse(file)
    
    def parse(self, file):
        log.info(f"Parsing file: \"{file}\"")
        patoolib.extract_archive(os.path.join(self.dir_demo, file), outdir=self.dir_temp)
        for demo in os.listdir(self.dir_temp):
            self.parse_demo(file, demo)
        self.reset_temp_dir()
    
    def parse_demo(self, file, demo_file):
        log.info(f"Parsing demo: \"{demo_file}\"")
        demo_parser = DemoParser(
            demofile = os.path.join(self.dir_temp, demo_file), 
            demo_id = demo_file[:-4],
            parse_rate = self.parse_rate, 
            trade_time = self.trade_time, 
            buy_style = self.buy_style
        )
        demo_parser.parse()
        if not os.path.exists(os.path.join(self.dir_store, file[:-4])):
            os.mkdir(os.path.join(self.dir_store, file[:-4]))
        shutil.move(demo_file[:-4] + ".json", os.path.join(self.dir_store, file[:-4], demo_file[:-4] + ".json"))
    
    def reset_temp_dir(self):
        log.info(f"Resetting directory: {self.dir_temp}")
        if os.path.exists(self.dir_temp):
            shutil.rmtree(self.dir_temp)
            os.mkdir(self.dir_temp)

In [4]:
# p = parser(dir_demo = r'F:\hltv\blast')
# p.parse_all()

In [5]:
class match_etl:
    def __init__(self, directory = r'F:\hltv', temp_directory = r'csgo_tmp', parse_rate = 32, 
                 db_con_str = r'mysql+mysqlconnector://root:mysqlroot@localhost/CSGOAnalysis?allow_local_infile=1', 
                 trade_time = 5, buy_style = "hltv"):
        self.directory = directory
        self.temp_directory = temp_directory
        self.parse_rate = parse_rate
        self.db_con_str = db_con_str
        self.trade_time = trade_time
        self.buy_style = buy_style
    
    def parse_all_json(self):
        log.info(f"Parsing all files in directory: \"{self.directory}\"")
        for file_dir in tqdm(os.listdir(self.directory), desc="Match", leave=True, position=0):
            log.info(f"Parsing file: \"{file_dir}\"")
            for demo in os.listdir(os.path.join(self.directory, file_dir)):
                demo_etl = map_etl(os.path.join(self.directory, file_dir), demo, file_dir, self.parse_rate, 
                                   self.db_con_str, trade_time = self.trade_time, buy_style = self.buy_style)
                demo_etl.etl_json()
    
    def parse_all(self):
        log.info(f"Parsing all files in directory: \"{self.directory}\"")
        for file in tqdm(os.listdir(self.directory), desc="Match", leave=True, position=0):
            self.parse(file)
    
    def parse(self, file):
        log.info(f"Parsing file: \"{file}\"")
        patoolib.extract_archive(os.path.join(self.directory, file), outdir=self.temp_directory)
        for demo in os.listdir(self.temp_directory):
            demo_etl = map_etl(self.temp_directory, demo, file[:-4], self.parse_rate, self.db_con_str, 
                               trade_time = self.trade_time, buy_style = self.buy_style)
            demo_etl.etl()
        self.reset_temp_dir()
        
    def reset_temp_dir(self):
        log.info(f"Resetting directory: {self.temp_directory}")
        if os.path.exists(self.temp_directory):
            shutil.rmtree(self.temp_directory)
            os.mkdir(self.temp_directory)

In [6]:
class map_etl:
    def __init__(self, directory, demo_file, match_name, parse_rate, 
                 db_con_str = r'mysql+mysqlconnector://root:mysqlroot@localhost/CSGOAnalysis?allow_local_infile=1', 
                 trade_time = 5, buy_style = "hltv"):
        self.directory = directory
        self.demo_file = demo_file
        self.match_name = match_name
        self.parse_rate = parse_rate
        self.db_con_str = db_con_str
        self.trade_time = trade_time
        self.buy_style = buy_style
    
    def etl_json(self):
        log.info(f"Openning connection to database: \"{self.db_con_str}\"")
        self.db_con = create_engine(self.db_con_str)
        self.parse_json()
        self.make_tables()
        self.save_to_db()
        self.db_con.dispose()
    
    def etl(self):
        log.info(f"Openning connection to database: \"{self.db_con_str}\"")
        self.db_con = create_engine(self.db_con_str)
        self.parse()
        self.make_tables()
        self.save_to_db()
        self.db_con.dispose()
    
    def parse_json(self):
        log.info(f"Parsing json demo: \"{self.demo_file}\"")
        demo_parser = DemoParser()
        self.parsed_demo = demo_parser.read_json(os.path.join(self.directory, self.demo_file))
        self.df_demo = demo_parser.parse_json_to_df()
    
    def parse(self):
        log.info(f"Parsing demo: \"{self.demo_file}\"")
        demo_parser = DemoParser(
            demofile = os.path.join(self.directory, self.demo_file), 
            demo_id = self.demo_file[:-4],
            parse_rate = self.parse_rate, 
            trade_time = self.trade_time, 
            buy_style = self.buy_style
        )
        self.df_demo = demo_parser.parse(return_type="df")
        self.parsed_demo = demo_parser.json
        os.remove(self.demo_file[:-4] + ".json")
    
    def make_tables(self):
        log.info("Creating all tables")
        self.match_info = self.get_match_info()
        self.match_id = self.get_match_id()
        self.players_df = self.get_player_data()
        self.player_id_mapping = self.get_player_mapping()
        self.match_df = self.get_match_data()
        self.player_name_mapping = self.get_player_name_mapping()
        self.kills = self.get_kills(self.df_demo["kills"], ["attacker", "victim", "assister", "flashThrower", "playerTraded"])
        self.damages = self.get_damages(self.df_demo["damages"], ["attacker", "victim"])
        self.grenades = self.get_dataframe(self.df_demo["grenades"].rename(columns={"throwTick": "tick"}), ["thrower"]
                                          ).rename(columns={"tick": "throwTick"})
        self.flashes = self.get_dataframe(self.df_demo["flashes"], ["attacker", "player"]).drop("matchId", axis=1)
        self.weaponFires = self.get_dataframe(self.df_demo["weaponFires"], ["player"])
        self.bombEvents = self.get_dataframe(self.df_demo["bombEvents"], ["player"]).drop("ID", axis=1)
        self.rounds = self.get_rounds(self.df_demo["rounds"])
    
    def save_to_db(self):
        log.info("Saving all tables to database")
        self.save_to_db_table(self.rounds, "round")
        self.save_to_db_frame()
        self.save_to_db_table(self.kills, "elimination")
        self.save_to_db_table(self.damages, "damage")
        self.save_to_db_table(self.grenades, "grenade")
        self.save_to_db_table(self.flashes, "flash")
        self.save_to_db_table(self.weaponFires, "weaponFire")
        self.save_to_db_table(self.bombEvents, "bombEvent")
    
    def save_to_db_frame(self):
        log.debug(f"Saving frame table to database")
        self.match_df.to_csv(os.path.join(self.directory, "frame.csv"), sep = ";", encoding = 'utf-8', 
                             index = False, header = False)
        with self.db_con.begin() as connection:
            connection.execute("SET FOREIGN_KEY_CHECKS = 0;")
            connection.execute(r"LOAD DATA LOCAL INFILE '"+self.directory.replace('\\', '/')+r"/frame.csv' INTO TABLE frame CHARACTER SET utf8 FIELDS TERMINATED BY ';' ESCAPED BY '\\';")
        os.remove(os.path.join(self.directory, "frame.csv"))
    
    def save_to_db_table(self, df, name):
        log.debug(f"Saving {name} table to database")
        with self.db_con.connect() as connection:
            result = connection.execute("SET FOREIGN_KEY_CHECKS = 0;")
            df.to_sql(name = name, con = connection, if_exists = "append", index = False)
    
    def get_team_data(self, frame, team, mapping):
        team_frame = frame[team]
        team_data = {}
        team_data[team + 'Name'] = team_frame['teamName']
        team_data[team + 'EqVal'] = team_frame['teamEqVal']
        team_data[team + 'AlivePlayers'] = team_frame['alivePlayers']
        team_data[team + 'TotalUtility'] = team_frame['totalUtility']

        for player in team_frame['players']:
            mapped_player = mapping[player['steamID']]
            team_data[f"{team}{mapped_player}_ID"] = self.player_id_mapping[str(player['steamID'])]
            for key_player in player:
                if key_player not in ['inventory', 'steamID', 'name', 'team', 'side', 'flashGrenades', 'smokeGrenades', 
                                     'heGrenades', 'fireGrenades', 'totalUtility']:
                    team_data[f'{team}{mapped_player}_{key_player}'] = player[key_player]
                elif key_player == 'inventory':
                    team_data[f"{team}{mapped_player}_SmokeGrenade"] = 0
                    team_data[f"{team}{mapped_player}_Flashbang"] = 0
                    team_data[f"{team}{mapped_player}_DecoyGrenade"] = 0
                    team_data[f"{team}{mapped_player}_fireGrenades"] = 0
                    team_data[f"{team}{mapped_player}_HEGrenade"] = 0
                    if player[key_player] is None:
                        team_data[f'{team}{mapped_player}_mainWeapon'] = ''
                        team_data[f'{team}{mapped_player}_secondaryWeapon'] = ''
                    else:
                        for weapon in player[key_player]:
                            if weapon['weaponClass'] == 'Pistols':
                                team_data[f'{team}{mapped_player}_secondaryWeapon'] = weapon['weaponName']
                            elif weapon['weaponClass'] == 'Grenade':
                                if weapon['weaponName'] in {"Molotov", "Incendiary Grenade"}:
                                    team_data[f"{team}{mapped_player}_fireGrenades"] = weapon['ammoInMagazine'] +\
                                        weapon['ammoInReserve']
                                else:
                                    team_data[f"{team}{mapped_player}_{weapon['weaponName'].replace(' ', '')}"] =\
                                        weapon['ammoInMagazine'] + weapon['ammoInReserve']
                            else:
                                team_data[f'{team}{mapped_player}_mainWeapon'] = weapon['weaponName']
                        if f'{team}{mapped_player}_mainWeapon' not in team_data and\
                                f'{team}{mapped_player}_secondaryWeapon' not in team_data:
                            team_data[f'{team}{mapped_player}_mainWeapon'] = ''
                        elif f'{team}{mapped_player}_mainWeapon' not in team_data:
                            team_data[f'{team}{mapped_player}_mainWeapon'] =\
                                team_data[f'{team}{mapped_player}_secondaryWeapon']
        return team_data

    def get_frame_data(self, frame, mapping):
        frame_data = {**self.get_team_data(frame, 'ct', mapping), 
                      **self.get_team_data(frame, 't', mapping)}
        frame_data['bombPlanted'] = frame['bombPlanted']
        frame_data['bombsite'] = frame['bombsite']
        frame_data['tick'] = frame['tick']
        frame_data['seconds'] = frame['seconds']
        frame_data['clockTime'] = frame['clockTime']
        bomb_data = frame['bomb']
        for key in bomb_data:
            frame_data[f"bomb_{key}"] = bomb_data[key]
        return frame_data

    def create_mapping(self, round_):
        ct_players = round_['ctSide']
        map_steam_id = {}
        for i, player in enumerate(ct_players['players']):
            map_steam_id[player['steamID']] = f'Player_{i + 1}'

        t_players = round_['tSide']
        for i, player in enumerate(t_players['players']):
            map_steam_id[player['steamID']] = f'Player_{i + 1}'

        return map_steam_id

    def get_match_data(self):
        log.debug("Creating frame table")
        data = self.parsed_demo
        data_list = []
        mapping = self.create_mapping(data['gameRounds'][0])
        for round_ in data['gameRounds']:
            for frame in round_['frames']:
                converted_vector = self.get_frame_data(frame, mapping)
                converted_vector['roundNum'] = round_['roundNum']
                data_list.append(converted_vector)
            last_tick = round_['endTick']
            
        frame_columns = []
        with open('frame_columns', 'r') as f:
            for line in f.readlines():
                frame_columns.append(line[:-1])
        res = pd.DataFrame(data_list, columns = frame_columns)
        res.fillna(method='ffill', inplace=True)
        for col in res.columns:
            if type(res[col][0]) == list:
                res[col] = res[col].astype('str')
        res["matchID"] = self.match_id
        res = res.sort_index(axis=1)
        for col in res.columns:
            if res[col].dtypes == "bool":
                res[col] = res[col].replace({False: "", True: "1"})
        return res

    def get_player_data(self):
        log.debug("Creating player table")
        players_df = self.df_demo["playerFrames"][["steamID", "name", "teamName"]]
        return players_df.drop_duplicates()

    def get_player_id_from_db(self, row):
        return pd.read_sql(sql = f'SELECT ID FROM player WHERE steamID = \"{row["steamID"]}\" AND \
            name = \"{row["name"]}\" AND \
            teamName = \"{row["teamName"]}\"', con = self.db_con)

    def get_player_id(self, row):
        log.debug(f'Getting player id for player: {row["name"]}')
        player_id = self.get_player_id_from_db(row)
        if player_id.empty:
            row.to_frame().T.to_sql(name = "player", con = self.db_con, if_exists = "append", index = False)
            player_id = self.get_player_id_from_db(row)
        return player_id.iloc[0,0]

    def get_player_mapping(self):
        mapping = dict(zip(self.players_df["steamID"].astype('str'), 
                           self.players_df.apply(lambda x: self.get_player_id(x), axis=1)))
        mapping["None"] = np.nan
        mapping["<NA>"] = np.nan
        mapping[""] = np.nan
        return mapping

    def get_player_name_mapping(self):
        return dict(zip(self.players_df["name"], self.players_df["steamID"].astype("str")))

    def get_match_info(self):
        log.debug("Creating game table")
        df_demo = self.df_demo
        return pd.DataFrame(dict(zip(['fileName', 'parsedMatchID', 'clientName', 'mapName', 'tickRate', 'playbackTicks', 
                                      'parseRate'], 
                                     [self.match_name, df_demo['matchID'], df_demo['clientName'], df_demo['mapName'],
                                      df_demo['tickRate'], df_demo['playbackTicks'], self.parse_rate])), index=[0])

    def get_match_id_from_db(self):
        match_info = self.match_info
        return pd.read_sql(sql = f'SELECT ID FROM game WHERE fileName = \"{match_info["fileName"][0]}\" AND \
            parsedMatchID = \"{match_info["parsedMatchID"][0]}\" AND \
            clientName = \"{match_info["clientName"][0]}\" AND \
            parseRate = \"{match_info["parseRate"][0]}\"', con = self.db_con)

    def get_match_id(self):
        log.debug("Getting game id")
        match_id = self.get_match_id_from_db()
        if match_id.empty:
            self.match_info.to_sql(name = "game", con = self.db_con, if_exists = "append", index = False)
            match_id = self.get_match_id_from_db()
        return match_id.iloc[0,0]

    def get_dataframe(self, df, player_types):
        log.debug("Creating basic table")
        ticks = self.match_df["tick"]
        df = df.drop("mapName", axis=1)
        columns = ["Name", "Team"]
        to_drop = [player + col for player in player_types for col in columns]
        df = df.drop(to_drop, axis=1)
        for player in player_types:
            df[player + "SteamID"] = df[player + "SteamID"].astype("str")
            df = df.replace({player + "SteamID": self.player_id_mapping})\
                .rename(columns = {player + "SteamID": player + "ID"})
            df[player + "ID"] = df[player + "ID"].astype("Int64")
        df["tick_parsed"] = df["tick"].apply(lambda x: ticks[np.argmin(np.abs(ticks-x))])
        df["matchID"] = self.match_id
        df["ID"] = range(1, df.shape[0]+1)
        return df

    def get_kills(self, df, player_types):
        log.debug("Creating elimination table")
        for player in ["attacker", "assister", "flashThrower", "playerTraded"]:
            df = df.replace({player + "Name": self.player_name_mapping})
            df[player + "SteamID"] = df[player + "Name"]
        return self.get_dataframe(df, player_types)

    def get_damages(self, df, player_types):
        log.debug("Creating damage table")
        df = df.replace({"attackerName": self.player_name_mapping})
        df["attackerSteamID"] = df["attackerName"]
        df["zoomLevel"] = df["zoomLevel"].fillna(0).astype("Int64")
        return self.get_dataframe(df, player_types)

    def get_rounds(self, df):
        log.debug("Creating round table")
        df["matchID"] = self.match_id
        df.loc[df['startTick'] < 0, 'startTick'] = 0
        endT = self.parsed_demo['matchPhases']['roundEnded']
        if len(endT) != df.shape[0]:
            ind = [None]*len(endT)
            for i, tick in enumerate(endT):
                ind[i] = (df['endTick'] - tick).abs().min()
            ind = pd.Series(ind)
            ind = list(ind.nlargest(abs(len(endT) - df.shape[0])).index.sort_values(ascending=False))
            for i in ind:
                del endT[i]
        df['endTickCorrect'] = endT
        return df.drop("mapName", axis=1)

In [7]:
# etl = match_etl(directory = r"F:\csgoanalysis")
# etl.parse_all_json()