In [None]:
import os
from dotenv import load_dotenv
import pandas as pd

from src.datasetgenerator import StratzQuery, DatasetGenerator

In [None]:
load_dotenv()
#querier = StratzQuery(os.getenv('STRATZ_TOKEN'))
#querier.get_match(7590822094)

csv_path = os.getenv('CSV_PATH')

generator = DatasetGenerator(os.getenv('STRATZ_TOKEN'), csv_path)

In [None]:

league = generator.get_professional_league(16842)
matches = league['matches']
len(matches)

In [None]:
matches[0].keys()

In [None]:
matches[0]['playbackData']['wardEvents']

In [None]:
def get_deaths_through_ward(match:dict):
    deaths_through_ward = {
        'time': [],
        'isPlayerRadiant': [],
        'positionX': [],
        'positionY': [],
    }
    for player in match["players"]:
        isPlayerRadiant = player['isRadiant']
        deaths_events = player['playbackData']['deathEvents']
        for d_e in deaths_events:
            if not d_e['isWardWalkThrough']:
                continue
            deaths_through_ward['time'].append(d_e['time'])
            deaths_through_ward['isPlayerRadiant'].append(isPlayerRadiant)
            deaths_through_ward['positionX'].append(d_e['positionX'])
            deaths_through_ward['positionY'].append(d_e['positionY'])
    
    df_deaths = pd.DataFrame.from_dict(deaths_through_ward)
    return df_deaths

get_deaths_through_ward(matches[0])

In [None]:
from collections import defaultdict

def get_match_wards(match:dict) -> dict:
    player_team_by_slot = {player['playerSlot']: player['isRadiant'] for player in match['players']}
    match_id = match['id']
    ward_events = match['playbackData']['wardEvents']
    wards = defaultdict(lambda: {})
    for w_e in ward_events:
        if w_e["action"] == "SPAWN":
            wards[w_e['indexId']] = {
                "id": f"{match_id}_{w_e['indexId']}",
                "match": match_id,
                "spawned_time": w_e["time"],
                "despawned_time": None,
                "positionX": w_e["positionX"],
                "positionY": w_e["positionY"],
                "wardType": w_e["wardType"],
                "isRadiant": player_team_by_slot[w_e["fromPlayer"]],
                "playerDestroyed": w_e["playerDestroyed"],
            }
            continue
        wards[w_e['indexId']]["despawned_time"] = w_e['time']
    

    # get kills around
    
    return dict(wards)

wards = get_match_wards(matches[0])
wards.keys(), len(wards.keys())

In [None]:
from collections import defaultdict

def map_death_count(df_wards, df_deaths):
    death_time = df_deaths['time'].item()
    return (df_wards['spawned_time'] < death_time) & (death_time > df_wards['despawned_time'])

def get_df_match_wards(match:dict) -> dict:
    if not match['playbackData']:
        print(f"warning: no playbackData found for match {match['id']}")
        return
    wards = get_match_wards(match)
    df_wards = defaultdict(lambda: [])
    for id, w in wards.items():
        for key, item in w.items():
            df_wards[key].append(item)
    df_wards = pd.DataFrame.from_dict(df_wards).set_index("id")

    none_despawned_mask = df_wards['despawned_time'].isna()
    df_wards.loc[none_despawned_mask, 'despawned_time'] = match['durationSeconds']

    df_deaths = get_deaths_through_ward(match)
    
    df_wards['possible_enemies_death'] = 0
    return map_death_count(df_wards, df_deaths)

    
    df_wards['possible_enemies_death'] = 0


    df_wards['spawned_time'] = df_wards['spawned_time'] // 60
    df_wards['despawned_time'] = df_wards['despawned_time'] // 60
    
    df_wards['radiantTeam'] = match['radiantTeam']['name']
    df_wards['direTeam'] = match['direTeam']['name']


    return df_wards

def get_league_df_wards(league_id):
    league = generator.get_professional_league(league_id)
    matches = league['matches']
    ward_matches_df = [get_df_match_wards(match) for match in matches]
    df_wards = pd.concat(ward_matches_df)
    df_wards['league'] = league_id
    df_wards['region'] = league['region']
    return df_wards

# df_wards = get_league_df_wards(16842)
# df_wards.isna().any()

get_df_match_wards(matches[0])

In [None]:
df_wards[['positionX', 'positionY']].describe()

In [None]:
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np

# bg_img = np.array(Image.open('assets\Game_map_7.33.webp').resize((500,500)))
# # bg_img = mpimg.imread('assets\Game_map_7.33.webp')
# print(type(bg_img), bg_img.shape)
# for name, group in df_wards.groupby('match'):

#     print(f"\n{group['match'].unique()} - {group['radiantTeam'].unique()} x {group['direTeam'].unique()}")
#     fig, ax = plt.subplots(figsize=(5, 5), dpi=100)
#     ax.imshow(bg_img, extent=[55, 200, 55, 200])  # Adjust extent to match your data range
#     ax.scatter(group['positionX'], group['positionY'], label=f'Match {name}', color=group['wardType'].map(lambda x: "yellow" if x == "OBSERVER" else "blue"))

#     # Show plot
#     plt.show()


In [None]:
def get_t_mask(df_wards, t):
    df_wards_mask = df_wards['spawned_time']

In [None]:
df_wards