In [None]:
# default_exp ingest.ingest

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#exporti

import re
import pymongo
import json
import sc2reader
import errno
import jsonschema
import os

import pandas as pd
import numpy as np

from typing import *
from pathlib import Path
from pprint import pprint
from jsonschema import validate
from dataclasses import dataclass, astuple, asdict, field

from sc_training.ingest import *

sc2reader.engine.register_plugin(CtrlGroupTracker())

SyntaxError: invalid syntax (ingest.py, line 168)

In [None]:
#hide 

test_data_path = (Path(Path.cwd()/'test_replays') 
                  if Path('test_replays').exists() 
                  else Path('../../test_replays'))

test_batch_path = (test_data_path / "TestProfilerBatch")

assert test_data_path.exists()
assert test_data_path.is_dir()

# Section 1.7 - The main ingest module 

## Introduction

This section defines the ingest process' `inventory_replays` and `get_replay_indicators` functions following the design proposed in the Ingestion Sequence Diagram (see Section 1 - Data Ingestion and Clustering Process). It exports these functions into the ingest module of the ingest sub-package.

### Exportable Members
- `inventory_replays`
- `get_replay_indicators`

## Inventoring and Storing `replays` collection

To inventory the replays in a replay batch I can use the `get_replay_info` defined in the `summarise_rpl` module (see Section 1.1).

In the following code, I load a batch of replays and I print the summary of the first two replays using the `get_replay_info` function.

In [None]:
replay_batch = sc2reader.load_replays(str(test_batch_path))

for i, rpl in enumerate(replay_batch):
    print(get_replay_info(rpl))
    print(type(get_replay_info(rpl)))
    if i == 1:
        break

### Storing inventory in Database

I can store the `Replay_data` objects returned by `get_replay_info` in a document-based database to illustrate how this function could store an inventory or replay information in such a format. Storing this inventory means that other processes could access it to navigate the information built by the ingest process.

In the case of `sc_training`, I use a `config.json` file stored in the working project's data folder to set up a MongoDB local client and, through it, a database. The following code loads this file and defines the loading procedure for when this module is imported. 

I will divide this loading process into three steps. First, I locate and load the `config` file. This location process allows users to create a custom `config.json` in a data directory in their projects. That file can be used to customise the name of the database, the port address and number for the MongoDB client, and the location of the replays the user wants to process. 

Apart from this option, the code also defines how the module can default to a `config` file stored in the library's data folder if the user fails to provide this file. This default file allows the system to attempt to function based on the assumption that the users are using a Windows computer and have a traditional installation of StarCraft II and MongoDB. Of course, this default set-up would not be expected to work in all cases, but it provides an option and a sample of the `config` file.

Once the file is located, I also define a procedure to ensure it contains the necessary information to connect with the MongoDB client and access the appropriate database.

In [None]:
#exporti
@dataclass
class Config_settings:
    port_address: str
    port_number: int
    db_name: str
    replay_path: str

    def __str__(self):
        headers = ["Port Address: ","Port Number: ",
                  "DB Name: ","Replays file: "]
        strings = [f'{h:<15}{att:>40}\n' for h, att
                   in zip(headers, astuple(self))]
        return ''.join(strings)
        
Config_schema = {
    "type": "object",
    "properties":{
        "DB_NAME": {"type":"string"},
        "PORT_ADDRESS":  {"type":"string"},
        "PORT_NUMBER": {"type":"number"},
        "REPLAY_PATH": {"type":"string"}
    }
}

def validate_config_file(file: Path, schema: Dict[str, Any]) -> bool:
    try:
        validate(file, schema)
    except jsonschema.exceptions.ValidationError as err:
        print(err)
        print("config.json does not conform to the required specifications")
        raise err
    except jsonschema.exceptions.SchemaError as err:
        print(err)
        print("The Config_schema is invalid")
        raise err
    
    return True

def open_config_file(config_file: Path) -> dict[str, Any]:
    try:
        if not config_file.exists():
            raise FileNotFoundError
        
        validate_config_file(json.load(config_file.open()), Config_schema)
        
        with config_file.open('r') as cf:
            return json.load(cf)

    except FileNotFoundError as err:
        print('config.json not found')
        raise err


All of those check and load procedures are contained within the `load_configurations` function. The following sample code shows the use of this fucntion to set up a test data base.

In [None]:
#export
def load_configurations() -> Config_settings:
    """Loads the project's
    """

    config_file = (Path(Path.cwd()/'data/config.json') 
               if Path(Path.cwd()/'data/config.json').exists()
               else Path(Path(__file__)/'../../../data/config.json'))

    config_dict = open_config_file(config_file)
    return Config_settings(
        config_dict['PORT_ADDRESS'],
        config_dict['PORT_NUMBER'],
        config_dict['DB_NAME'],
        config_dict['REPLAY_PATH']
    )

In [None]:
db_settings = load_configurations()
mongo_client = pymongo.MongoClient(db_settings.port_address, 
                                   db_settings.port_number)
worcking_bd = mongo_client[db_settings.db_name]

In [None]:
print('config.json content:')
print(db_settings)
print('Local Mongo DB Client')
print(mongo_client, end='\n\n')
print('Database: ')
print(worcking_bd)


Once the database is set, I can loop through the replays in a file inserting the return values of the functions from the ingest subpackage's different modules into collections within the database. 

According to the Ingest and Clustering Sequence Diagram (see Section 1 Intro), I need to define a function that takes a batch of replays and inserts the indexing data into the `replays` collection. The following code shows the use of a loop that would do precisely that. 

> Note: in the example, I set up a new database called `sample_db` that I will use in this notebook for illustration purposes. 

In [None]:
sample_db = mongo_client['sample_db']
collection = sample_db['replays']

replay_batch = sc2reader.load_replays(db_settings.replay_path)

# for rpl in replay_batch:
#     if not collection.count_documents({'replay_name': rpl.filename}, 
#                                       limit = 1):
#         # print(f'Adding {Path(rpl.filename).name} to replays collection.')
#         collection.insert_one(asdict(get_replay_info(rpl)))
#     else:
#         print(rpl.filename, "already exists in the replay_info collection.")

Once the loop finishes, the collection has been created in the database. The following snippet shows that this collection is now composed of several documents containing the indexing data for each replay. Additionally, I added a conditional statement within the loop to check if the collection already contains a replay. In that case, the loop will print a warning and avoid inserting repeated documents into the database.

In [None]:
print('The collection now contains:', 
      collection.estimated_document_count(), 'documents')

## Building the `indicators` collection

Similarly, I can loop through every replay in a batch, extracting the performance indicators for each replays' players. Moreover, I can store these indicators as separate documents in a second collection (i.e. rpl_indicators). This two collection approach was defined in Section 1's introduction.

In the following code, I illustrate how such a loop would apply all the functions defined in the `ingest` subpackage to build the indicators of two players in a sample replay. Each player's indicators are stored in a single flat dictionary, i.e. a dictionary that has no nested data structures as values.  

In [None]:
#exporti
def flatten_indicators(nested_value: dict) -> dict[str, float]:
    
    if isinstance(list(nested_value.values())[0], dict):
        output_dict = {}
        for k, nested in nested_value.items():
            for nested_k, v in nested.items():
                output_dict[f'{k}_{nested_k}'] = v
        return output_dict
    elif isinstance(list(nested_value.values())[0], tuple):
        output_dict = {}
        order = ['first', 'second']
        for k, nested in nested_value.items():
            for ord, v in zip(order, nested):
                output_dict[f'{ord}_{k}'] = str(v)
        return output_dict


In [None]:

indicators = sample_db['indicators']

sample_replay = [sc2reader.load_replay('test_replays\Jagannatha LE.SC2Replay')]

simple_functions = [get_player_macro_econ_stats,
                    get_expan_times,
                    get_expan_counts,
                    calc_attack_ratio,
                    calc_ctrlg_ratio,
                    count_max_active_groups,
                    calc_get_ctrl_grp_ratio,
                    calc_select_ratio,
                    list_player_upgrades,
                    calc_spe_abil_ratios]

double_functions = [count_composition,
                    count_started]




indicators_list = []
for rpl in sample_replay:
    print(f'Processing: {rpl.filename}')
    for pid, player in rpl.player.items():
        print(f'Processing player: pid:{pid} {player}')
        rpl_indicators = {}
        
        for func in simple_functions:
            rpl_indicators.update(func(rpl, pid))

        v = get_prefered_spec_abil(rpl, pid)
        rpl_indicators.update(flatten_indicators(v))

        for func in double_functions:
            for flag in [True, False]:
                rpl_indicators.update(flatten_indicators(func(rpl, pid, flag)))
                
        indicators_list.append(rpl_indicators)


As a result of the code above, `indicaors_list` now contains two dictionaries that store the indicators for each player. 

> Tip: In some cases, each player's indicators lists would contain a different number of indicators. This difference exists because the list stores separate counts for the players' units and buildings according to their race during the match. Remember that each one of the game races has a different number of available units. 

In [None]:
print('Number of players evaluated: ',len(indicators_list))
print('First set of indicators belongs to:', 
      indicators_list[0]['player_username'])
print('First set of indicators contains: ',
        len(indicators_list[0]), 'indicators.')
print('Second set of indicators belongs to:', 
      indicators_list[1]['player_username'])
print('Second set of indicators contains: ',
        len(indicators_list[1]), 'indicators.')      

## Exportable functions 
This section defines `inventory_replays`. With this function, users can provide a directory containing multiple `.SC2Replay` files. The function will build four collections in the database specified in the `config.json` file. 

Thus, I have condensed two initially proposed functions in the ingestion process diagram, i.e. `inventory_replays` and `get_replay_indicators`, to run in one loop. I also split the indicators by play race into separate collections. This separation will make it easier to compare and condense these replays into the players' race profiles.

Internally, the function uses three helper functions:

- `set_up_db`: connects to the MongoDB client and loads the working database.
- `verify_replays_path`: makes sure that the path past by the user is valid.
- `build_indicators`: runs through the indicator extraction loop and stores the results in the indicators collections.

In [None]:
#exporti
def set_up_db():    
    db_settings = load_configurations()
    mongo_client = pymongo.MongoClient(db_settings.port_address, 
                                    db_settings.port_number)
    worcking_bd = mongo_client[db_settings.db_name]

    return worcking_bd

In [None]:
#exporti
def verify_replays_path(rpl_path: Any) -> Path:

    if isinstance(rpl_path, str):
        path = Path(rpl_path)
    elif not isinstance(rpl_path, Path):
        string = 'replay_path must be of type str or Path, not ' 
        raise TypeError((string + str(type(replay_batch))))
    else:
        path = rpl_path

    if not path.exists():
        raise ValueError(f'{path} is not valid location')

    return path

In [None]:
#exporti
def build_indicators(rpl: sc2reader.resources.Replay,
                    working_db: pymongo.database.Database) -> None:
    
    """Runs through the indicator extraction loop and stores the results 
    in the indicators collections.
    """
    simple_functions = [get_player_macro_econ_stats,
                        get_expan_times,
                        get_expan_counts,
                        calc_attack_ratio,
                        calc_ctrlg_ratio,
                        count_max_active_groups,
                        calc_get_ctrl_grp_ratio,
                        calc_select_ratio,
                        list_player_upgrades,
                        calc_spe_abil_ratios
                        ]

    double_functions = [count_composition,
                        count_started]
    
    for pid, player in rpl.player.items():
        indi_collect = worcking_bd[f'indicators_{player.play_race}']

        rpl_indicators = {}
        for func in simple_functions:
            rpl_indicators.update(func(rpl, pid))

        v = get_prefered_spec_abil(rpl, pid)
        rpl_indicators.update(flatten_indicators(v))

        for func in double_functions:
            for flag in [True, False]:
                rpl_indicators.update(flatten_indicators(func(rpl, pid, flag)))

        rpl_ind = ({k: v 
                    if (not (isinstance(v, np.int64) 
                        or isinstance(v, np.float64)))
                    else float(v) for k, v in rpl_indicators.items()})
        
        indi_collect.insert_one(rpl_ind)

In [None]:
#export
def inventory_replays(replay_batch: Any) -> None:
    """This function builds four collections within the database 
    specified in the config.json file. 

    - `replays`
        Stores the metadata of the replays, can be used for indexing and
        for finding the other replays.
    - `inicators_Protoss`, `indicator_Terran`, `indicators_Zerg`
        Store the indicators for each performance of every player, 
        separated by the race they play with in every match.
    """
    working_db = set_up_db()
    rpls_collect = working_db['replays']
    path = verify_replays_path(replay_batch)

    replays = sc2reader.load_replays(str(path))
    
    load_count = 0
    for rpl in replays:
        if (not (rpl.type == "1v1")):
            print(f'{rpl.filename}is not ladder or practice')
            continue
        if not rpls_collect.count_documents({'replay_name': rpl.filename}, 
                                            limit = 1):
            print(f'Processing {rpl.filename}')
            rpls_collect.insert_one(asdict(get_replay_info(rpl)))
            build_indicators(rpl, worcking_bd)
            load_count += 1
        else:
            print(rpl.filename, "already exists in the replay_info rpls_collect.")
        
    print(f'Load complete. {load_count} files loaded')
        

In [None]:
# inventory_replays(test_batch_path)

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()