In [15]:
import os
import json
import numpy as np
import pickle 
import tqdm
from scipy import sparse
from collections import defaultdict
from pprint import pprint
from collections import Counter
from itertools import permutations

class PositionOptimizer:
    def __init__(self, clfs_path: str, hero_path: str) -> None:
        self.clfs_path = clfs_path
        self.hero_path = hero_path
        self.opendota_data = {}
        self.clfs = defaultdict(list)
        self.role_counts = defaultdict(dict)
        self.hero_data = {}

        self._load_clfs(clfs_path)
        self._load_hero_data(hero_path)
        self._load_opendota_data()
        self.hid_to_name = {h["id"]: h["localized_name"] for h in self.hero_data}

    def find_optimal_roles(self, match):
        players = match["players"]
        t0 = [p for p in players if p["player_slot"] < 128]
        t1 = [p for p in players if p["player_slot"] >= 128]
        team_optimal_positions = {}


        for team, marker in zip([t0, t1], [0, 1]):

            # !Ranks
            attributes = ["gold_per_min", "xp_per_min", "kills", "deaths", "assists", "last_hits", "hero_damage", "tower_damage"]
            ranks = {attr: sorted([(p["hero_id"], p[attr]) for p in team], key=lambda x: x[1], reverse=True) for attr in attributes}
            hids = [p["hero_id"] for p in team]
            team_position_proba = defaultdict(list)

            # !Create c
            for p in team:
                features = []
                hid = p["hero_id"]

                teammates = np.zeros((136))
                for team_hid in hids:
                    if team_hid != hid:
                        teammates[team_hid] = 1.
                
                features.append(teammates)
                
                for rank in ranks: 
                    r = ranks[rank].index((hid, p[rank]))
                    features.append(self._get_rank(r))

                x = np.concatenate(features)
                y_pred = self.clfs[hid].predict_log_proba(x.reshape(1, -1))
                updated_y_pred = self._remove_unplayed_roles(hid, y_pred.ravel())         
                team_position_proba[hid] = updated_y_pred

            best_log_p = -np.inf
            best_comp = None

            # Optimal 
            for comp in permutations(range(5), 5):
                comp_with_heroid = [(comp[i], hid, self.hid_to_name[hid], round(team_position_proba[hid][comp[i]], 2)) for i, hid in enumerate(hids)]
                comp_with_heroid_dict = {c[1]: (c[0], c[2], c[3]) for c in comp_with_heroid}
                
                log_p = np.array([team_position_proba[hid][comp[i]] for i, hid in enumerate(hids)]).sum()
                if log_p > best_log_p:
                    best_log_p = log_p
                    best_comp = comp_with_heroid_dict
                    # best_comp = sorted(comp_with_heroid, key=lambda x: x[0])
            
            team_optimal_positions[marker] = best_comp
            # Greedy            

            # break
                # arg_max = np.argmax(team_position_proba[hid])
                
            # argmax team_position_proba
            # sett alt annet en pos til -inf



            # print("{} => {:.2f}".format(best_comp, best_log_p))

        # Return both teams
        return team_optimal_positions

    def _remove_unplayed_roles(self, hid, y_pred, threshold=200):
        ys = self.opendota_data["ys"]
        role_counts = dict(Counter(ys[hid]))
        updated_y_pred = np.zeros(y_pred.shape)

        for k in range(0,5):
            if role_counts[k + 1] < threshold:
                updated_y_pred[k] = -1000.
            else:
                updated_y_pred[k] = y_pred[k]
        return updated_y_pred

    def _get_rank(self, rank):
        oh = np.zeros(5)
        oh[rank] = 1
        return oh

    def _load_clfs(self, clf_path):
        """
        Load clfs from pickle file
        """
        with open(clf_path, 'rb') as f:
            self.clfs = pickle.load(f)

    def _load_opendota_data(self) -> None:
        """
        Load opendota data from data/opendota_data.json
        """
        with open('../position_optimizer/data/dataset_positions_all.pkl', 'rb') as f:
            self.opendota_data = pickle.load(f)


    def _load_hero_data(self, hero_path) -> None:
        """
        Load hero data from data/heroes.json
        """
        with open(hero_path, 'r') as f:
            self.hero_data = json.load(f)

    def get_all_hids(self):
        return [h["id"] for h in self.hero_data]


Annotate each hero with position

In [14]:
import os
import json
import numpy as np
import pickle
import tqdm
import gzip
from scipy import sparse
from collections import defaultdict
from pprint import pprint
from collections import Counter


def is_valid_match(match, all_hids):
    if "match_id" not in match:
        return False
    if not match["lobby_type"] in {0, 6, 7}:
        # print("Invalid lobby type:", match["lobby_type"])
        return False
    if match["duration"] < 60 * 20:
        # print("Invalid duration:", match["duration"])
        return False
    if not match["game_mode"] in {1, 2, 16, 22}:
        # print("Invalid game mode:", match["game_mode"])
        return False

    for p in match["players"]:
        if p["hero_id"] not in all_hids:
            # print("Invalid hero id:", p["hero_id"])
            return False


    return True


results = []
seen_matches = set()
invalid = 0
count = 0
po = PositionOptimizer(
    '../position_optimizer/data/clfs/logreg_clfs_all.pkl', '../data/heroes.json')

training_data = []
all_hids = po.get_all_hids()

with gzip.open("../data/raw/dataset_batch1_900k.gz", "r") as fp:
    for line in tqdm.tqdm(fp):
        match_array = np.zeros(shape=(121, 5))

        match = json.loads(line)
        # Check if match is valid
        if not is_valid_match(match, all_hids):
            invalid += 1
            continue

        players = match["players"]
        t0 = [p for p in players if p["player_slot"] < 128]
        t1 = [p for p in players if p["player_slot"] >= 128]
        match_id = match["match_id"]
        # print(match_id)

        optimal_positions = po.find_optimal_roles(match)
        # pprint(optimal_positions)
        for i, h in enumerate(po.hero_data):
            participant_hero_lane = np.zeros(shape=(5))
            for p in t0:
                hid = p['hero_id']
                pos = optimal_positions[0][hid][0]
                if hid == h['id']:
                    participant_hero_lane[pos] = 1
                    match_array[i] = participant_hero_lane
            for hero in t1:
                hid = p['hero_id']
                pos = optimal_positions[0][hid][0]
                if hid == h['id']:
                    participant_hero_lane[pos] = -1
                    match_array[i] = participant_hero_lane
        # print(match_array.shape)
        training_data.append(np.concatenate(match_array))

x = np.vstack(training_data)
x = x.astype("float32")

with open("../data/preprocessed/dataset_batch1_pos.npy", "wb") as f:
    np.save(f, x)


print(x.shape)
print("seen matches: {}, len samples: {}".format(
    len(seen_matches), len(results)))

# with open("../data/preprocessed/test_dataset.npy", "wb") as fp:
#     np.save(fp, np.array(results, dtype=np.int8), allow_pickle=True)


272it [00:05, 49.42it/s]


KeyboardInterrupt: 

In [110]:

x[0]


array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])