<h1 style="color:#ff5500; font-family:Play; font-size:3em; margin:auto 32px;align:center">Part I - Build Dataset</h1>

---


This document is a part of the FACEIT Predictor Data Science Workflow.

In this notebook the collected data (stored in the local MongoDB database) is processed in order to create a dataset.


# Imports


In [None]:
import json
import pandas as pd
import pickle

from pymongo import MongoClient, DESCENDING
from pymongo.errors import PyMongoError
from tqdm import tqdm
from collections import defaultdict
from pprint import pprint

# enable imports from parent directory
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent))

# local modules
from src.db.config import read_config
from src.data.build_dataset import create_all_lifetime_stats
from src.utils.dirs import INTERIM_DATA_DIR

from IPython import get_ipython

ipython = get_ipython()

# autoreload extension
if "autoreload" not in ipython.extension_manager.loaded:
   %load_ext autoreload

# autoreload python modules
%autoreload 2

# Database Connection


In [None]:
db_cfg = read_config("local.ingestorDB")

In [None]:
client = MongoClient(**db_cfg)
db = client['faceit_imported']

In [None]:
# Connect to the collections inside the ingestor database
players_coll = db['player']
matches_coll = db['match']
lifetime_stats_coll = db['player_lifetime_stats']

db_colls = {"player": players_coll, "match":matches_coll, "player_lifetime_stats":lifetime_stats_coll}

In [None]:
print("Number of player documents", players_coll.estimated_document_count())
print("Number of matches documents", matches_coll.estimated_document_count())
print("Number of lifetime stats documents", lifetime_stats_coll.estimated_document_count())

# Sample Documents

## Player *Schema*

<center>

| Field Name   |      Type      |  Description |
|----------|:-------------:|------|
| _id |  str | FACEIT account ID formatted as an UUID|
| activatedAt |    str  |   Date of activation of the FACEIT account |
| steamCreatedAt| datetime |   Date of creation of the Steam account linked to the FACEIT account |
| updatedAt | datetime |    Date of the last FACEIT profile update |
| csgoId | str |    Steam64ID of the Steam account linked to the FACEIT account |
| verified | bool |    Whether or not it is a verified FACEIT account.<br/>(usually reserved to high profile players including pros and streamers) |
| mapStats | dict |    Dictionary where keys are the map names and the values are a dictionary<br/>containing the *Lifetime Stats* of the player on the correspondent map.<br/>The stats for each include the following fields:<ul style="margin:8px"><li>kills</li><li>deaths</li><li>assists</li><li>matches</li><li>wins</li><li>rounds</li><li>headshots</li><li>mvps</li><li>tripleKills</li><li>quadraKills</li><li>pentaKills</li></ul>|
| updatedAtIngestor | int |    Unix timestamp of the last time the player was processed in the FACEIT Ingestor service |
| matchHistory | list |    List of the player's match history containing both match ID and its start time.<br/>It is ordered from the most recent to the oldest match.|

</center>


In [None]:
sample_player = players_coll.find_one({})
pprint(sample_player)

## Match *Schema*

<center>

| Field Name   |      Type      |  Description |
|----------|:-------------:|------|
| _id |  str | FACEIT match ID |
| entity |    str  |   The kind of match it belongs to: matchmaking, hub or championship (tournament). |
| entityName| str |   The name of entity in which the match was played.<br/>Regarding the _matchmaking_ entity there are two queues: Free and Premium.<br/>As for the other cases the entity name is the name of the hub or tournament.|
| mapPlayed | str |    The CS GO map where the match was played. |
| parties | dict |    Dictionary where the keys are the parties IDs and the values are<br/> the list of the players' IDs in each party. |
| score | str |    The score of match: rounds won by team A followed by team B. |
| startTime | int |     Unix timestamp of the match start time. |
| teamA | list |    List of players in team A. For each player the following fields are collected:<ul style="margin:8px"><li>elo</li><li>id</li><li>membership</li><li>playerStats</li><ul style="margin:8px"><li>kills</li><li>deaths</li><li>assists</li><li>headshots</li><li>mvps</li><li>tripleKills</li><li>quadraKills</li><li>pentaKills</li></ul></ul>|
| teamB | list |    Equivalent of teamA but for the players in team B. |
| teams | list |    List composed of teamA and teamB fields.|

</center>

In [None]:
sample_match = matches_coll.find_one({})
pprint(sample_match)

## Lifetime Stats *Schema*

<center>

| Field Name   |      Type      |  Description |
|----------|:-------------:|------|
| _id |  str | MongoDB Document ObjectID|
| mapStats | dict |    Dictionary where keys are the map names and the values are a dictionary<br/>containing the *Lifetime Stats* of the player on the correspondent map.<br/>The data refers to the lifetime stats right before the start of the match with `_id = matchId`.<br/>The stats for each include the following fields:<ul style="margin:8px"><li>kills</li><li>deaths</li><li>assists</li><li>matches</li><li>wins</li><li>rounds</li><li>headshots</li><li>mvps</li><li>tripleKills</li><li>quadraKills</li><li>pentaKills</li></ul>|
| matchId | str |    FACEIT match ID |
| playerId | str |   FACEIT account ID formatted as an UUID|
| startTime | int |    Unix timestamp of the match start time.|

</center>

In [None]:
sample_lifetime_stats = lifetime_stats_coll.find_one({})
pprint(sample_lifetime_stats)

# Create Lifetime Stats


Initially only the most recent lifetime stats are stored in DB (`player.mapStats`). In order to have consistent player lifetime stats for each match and avoid repeating the process over again, the lifetime stats are processed once and stored in DB.

To do so one must work backwards and continuously subtract the player stats on each match to the lifetime stats.

In [None]:
matches_cursor = matches_coll.find({}).sort("startTime", DESCENDING)

In [None]:
for m in tqdm(matches_cursor, total=matches_coll.estimated_document_count()):
    create_all_lifetime_stats(m, db_colls)

# Get Complete & Processable Matches


Even though there are more than one million matches stored in DB, not all are qualified to be part of the dataset.

A valid processable match should have complete player data for every participant. This includes the lifetime stats before the beginning of the match as well as his *10* previous matches.

- `match_history`: the match id is in the player's match history and the previous 10 matches are in DB
- `lifetime_stats`: the lifetime stats of the player regarding the match are available

The `match_history` and `lifetime_stats` are initially set to 0 and incremented each time the conditions are met for one player.


In [None]:
matches_ready = defaultdict(lambda: {"match_history": 0, "lifetime_stats": 0})

## Match History

In [None]:
all_players = players_coll.find({})

In [None]:
for p in tqdm(all_players, total=players_coll.estimated_document_count()):
    prev_matches = sorted(p["matchHistory"], key=lambda x: x["startTime"])
    prev_matches_ids = [m["id"] for m in prev_matches]

    matches_ids_in_db = set(matches_coll.distinct(
        "_id", {"_id": {"$in": prev_matches_ids}}))
    missing_decay = 0
    for index, m in enumerate(prev_matches_ids):
        match_id = m["id"]
        if match_id not in matches_ids_in_db:
            missing_decay = 10
            continue
        
        if missing_decay > 0:
            missing_decay -= 1
        elif missing_decay == 0 and index > 9:
            matches_ready[match_id]["match_history"] += 1

## Lifetime Stats

In [None]:
all_lifetime_stats = lifetime_stats_coll.find({})

In [None]:
for l in tqdm(all_lifetime_stats, total=lifetime_stats_coll.estimated_document_count()):
    matches_ready[l["matchId"]]["lifetime_stats"] += 1

## Filter Match Ids
Finally, the match ids are filtered to include only those who have full data for all ten players.

In [None]:
match_ids = [m_id for m_id, rd in matches_ready.items() if rd["match_history"] == 10 and rd["lifetime_stats"] == 10]

In [None]:
match_ids_filename = str(INTERIM_DATA_DIR) + "\\processable_match_ids.data"

In [None]:
if match_ids:
    with open(match_ids_filename, 'wb') as f:
        # store the data as binary data stream
        pickle.dump(match_ids, f)

In [None]:
with open(match_ids_filename, 'rb') as f:
    # read the data as binary data stream
    match_ids = pickle.load(f)

# Build Dataset


In [41]:
num_matches = len(match_ids)
print("Number of processable matches", num_matches)

Number of processable matches 68917


Dataset stored in batch files of size 2000. The format chosen is CSV.

In [None]:
BATCH_SIZE = 2000
num_batches = (num_matches // batch_size) + 1

## Raw Dataset

In [None]:
# Store raw data
for batch_index in tqdm(range(num_batches), desc=f"Processing batches of {batch_size}"):
    matches_to_process = list(matches_coll \
        .find({"_id": {"$in": match_ids}}, {"teams":0}) \
        .skip(batch_index*batch_size) \
        .limit(batch_size))

    pd.DataFrame(matches_to_process).to_csv(f'../data/raw/batch_{batch_index}.csv')


## Interim Dataset

Iterate over matches and for each player get the lifetime stats before the beginning of the match as well as the match ids of the 10 previous matches.

Store the data of every player on each element of the teamA and teamB lists.

In [None]:
players_data_fields = ['activatedAt', 'steamCreatedAt',
                       'updatedAt', 'csgoId', 'verified']

In [None]:
matches_to_process = matches_coll.find({"_id": {"$in": match_ids}})

In [None]:
#Store interim data
matches_processed = []
players_data = {}


for index, match in enumerate(tqdm(matches_to_process, total=len(processable_match_ids))):
    # Get all ids of the players in the match
    players_ids = {player['id'] for team in match['teams'] for player in team}

    lifetime_stats = lifetime_stats_coll.find({
        "matchId": match['_id'],
        "playerId": {"$in": list(players_ids)}})

    lifetime_data = {}
    for lt in lifetime_stats:
        lifetime_data[lt["playerId"]] = lt

    for team in ["teamA", "teamB"]:
        for player in match[team]:
            player_id = player["id"]

            # check if player already in data, if not retrieve from DB and store
            player_data = players_data.get(player_id, None)
            if not player_data:
                player_data = players_coll.find_one({"_id": player_id})
                players_data[player_data["_id"]] = player_data

            for player_field in players_data_fields:
                player[player_field] = players_data[player_id].get(
                    player_field, None)

            player["mapStats"] = lifetime_data[player_id]["mapStats"]

            player_match_history = player_data["matchHistory"]
            match_history_ids = [m['id'] for m in player_match_history] 
            match_index = match_history_ids.index(match['_id'])
            player["previousMatches"] = match_history_ids[match_index +
                                                          1:match_index+1+10]
    match.pop("teams")
    matches_processed.append(match)

    if index % BATCH_SIZE == 0 and index > 0:
        pd.DataFrame(matches_processed).to_csv(
            f'../data/interim/batch_{(index // BATCH_SIZE)-1}.csv')
        matches_processed.clear()


batch_number = (index // BATCH_SIZE)
pd.DataFrame(matches_processed).csv(
    f'../data/interim/batch_{batch_number}.csv')