In [56]:
import os
import json
from itertools import combinations
from math import sqrt

import numpy as np
import tqdm

In [None]:
def remove_fields(team_data):
    player_fields = [
        "heroId", "level", "xp", "networth", "totalGold", "currentGold",
        "lifeState", "respawnSeconds", "buybackCooldown",
        "heroDamage", "towerDamage", "damageTakenPostReduction",
        "kills", "deaths", "assists",
        "lastHits", "denies",
        "teamfightParticipation",
        "obsPlaced", "senPlaced",
        "x", "y"
    ]

    players = []
    for p in team_data.get("players", []):
        new_p = {}
        for key in player_fields:
            if key in p:
                new_p[key] = p[key]
        new_p["inventory"] = [{"id": item.get("id")} for item in p.get("inventory", [])]
        players.append(new_p)

    buildings = {
        name: {"health": b.get("health")}
        for name, b in team_data.get("buildings", {}).items()
    }

    output = {
        "players": players,
        "buildings": buildings,
        "observerWards": team_data.get("observerWards", []),
        "totalCampsStacked": team_data.get("totalCampsStacked", 0),
        "totalRunePickups": team_data.get("totalRunePickups", 0),
        "totalTowersKilled": team_data.get("totalTowersKilled", 0),
        "totalRoshansKilled": team_data.get("totalRoshansKilled", 0),
        "totalSmokesUsed": team_data.get("totalSmokesUsed", 0),
    }

    return output


In [None]:

MAP_DIAG = 256.0

def euclid(a, b):
    return sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)

def calculate_fields(team_data):
    players = team_data.get("players", [])
    wards = team_data.get("observerWards", []) 
    for p in players:
        p['lifeState'] = 0 if p.get("lifeState", 0) == 0 else 1
        p["respawnSeconds"] = max(0, p.get("respawnSeconds", 0))

    alive_xy = np.array([(p["x"], p["y"]) for p in players if p["lifeState"] == 0], dtype=np.float32).reshape(-1, 2)
    ward_xy = np.array([(w["x"], w["y"]) for w in wards], dtype=np.float32).reshape(-1, 2)

    if len(alive_xy) >= 2:
        dists = [euclid(a, b) for a, b in combinations(alive_xy, 2)]
        team_pdist_mean = float(np.mean(dists))
        team_pdist_max = float(np.max(dists))
        team_pdist_std = float(np.std(dists, ddof=0))
    else:
        team_pdist_mean = team_pdist_max = team_pdist_std = 0.0

    if len(alive_xy) > 0:
        centroid = alive_xy.mean(axis=0)
    else:
        centroid = np.zeros(2, dtype=np.float32)

    if len(ward_xy):
        c2w = np.linalg.norm(ward_xy - centroid, axis=1)
        centroid_ward_mean = float(np.mean(c2w))
    else:
        centroid_ward_mean = 0.0

    if len(alive_xy) == 0 or len(ward_xy) == 0:
        hw_mean = hw_min = hw_max = MAP_DIAG
    else:
        D = np.linalg.norm(alive_xy[:, None, :] - ward_xy[None, :, :], axis=2)
        nearest_per_hero = D.min(axis=1)
        hw_mean = float(nearest_per_hero.mean())
        hw_min = float(nearest_per_hero.min())
        hw_max = float(nearest_per_hero.max())

    team_data.pop("observerWards", None)
    for p in players:
        p.pop("x", None)
        p.pop("y", None)

    team_data["numWards"] = len(wards)
    team_data["pdistMean"] = round(team_pdist_mean, 2)
    team_data["pdistMax"] = round(team_pdist_max, 2)
    team_data["pdistStd"] = round(team_pdist_std, 2)
    team_data["centroidWardMean"] = round(centroid_ward_mean, 2)
    team_data["hwMean"] = round(hw_mean, 2)
    team_data["hwMin"] = round(hw_min, 2)
    team_data["hwMax"] = round(hw_max, 2)

    return team_data


In [None]:
def process_slice(ts):
    for team in ('radiant', 'dire'):
        team_data = ts[team]
        team_data = remove_fields(team_data)
        team_data = calculate_fields(team_data)
        ts[team] = team_data
    return ts

In [61]:
replay_dir = "../data/parsed_replays"
save_dir = "../data/processed_replays"
os.makedirs(save_dir, exist_ok=True)

for parsed_replay in tqdm.tqdm(os.listdir(replay_dir)):
    with open(os.path.join(replay_dir, parsed_replay), 'r', encoding='utf-8') as f:
        replay_data = json.load(f)
    for time in replay_data:
        ts = replay_data[time]
        replay_data[time] = process_slice(ts)
    with open(os.path.join(save_dir, parsed_replay), 'w', encoding='utf-8') as f:
        json.dump(replay_data, f, indent=2)

100%|██████████| 11267/11267 [18:29<00:00, 10.16it/s]
