# Scraper

__Purpose of this notebook is to design and implement a scraper of kaggle halite episode/game replays.__

## Design Scraper

### Endpoints

- there are two endpoints:
    - one that provides episode/game replays
    - the other which provides episode, team and agent metadata

### Design

- we need datasets to make data-driven decisions when building our halite agent
    - we can fulfill that data-dependency by obtaining historical gameplay
        - we can use the GetEpisodeReplay Endpoint to obtain game replays
- we want to pull a diverse set of game replays from good and bad bots
    - we can rank and sort the gameplays by their scores
        - we can obtain scores and metadata for episodes from the ListEpisodes Endpoint
- the ListEpisodes Endpoint can be queried by TeamId, SubmissionId, EpisodeIds
    - likely there are the fewest team ids but they are tied to all episodes
        - we can crawl through the an arbitrary team's oponents and build out a set of teams
            - for each team we can download their episodes based on some episode priority

### Design Requirements

- persist enumerated teamIds
- persist downloaded episode ids
- persist enumerated episode ids
- rate limited
    - 60 requests per minute max
    - 1000 requests per day max (167 mins @ 60 reqs/sec)

## Implement Scraper

In [1]:
from src.api.KaggleClient import KaggleClient
from pathlib import Path
import logging
import time
import json

In [2]:
def synchronize_disk(filepath, data, overwrite=False):
    synchronized_data = data
    if isinstance(synchronized_data, list) or isinstance(synchronized_data, set):
        if not overwrite:
            try:
                with open(filepath, 'r') as f:
                    synchronized_data.update([int(x) for x in f.readlines()])
            except FileNotFoundError:
                logging.warn('Failed to synchronize from disk. File {} is empty.'.format(filepath))
        try:
            with open(filepath, 'w+') as f:
                for x in synchronized_data:
                    f.write('{}\n'.format(x))
        except Exception:
            logging.exception('Failed to synchronize to disk.')
    elif isinstance(synchronized_data, dict):
        if not overwrite:
            try:
                with open(filepath, 'r') as f:
                    synchronized_data.update(json.load(f))
            except FileNotFoundError:
                logging.warn('Failed to synchronize from disk. File {} is empty.'.format(filepath))
        try:
            with open(filepath, 'w') as f:
                json.dump(synchronized_data, f)
        except Exception:
            logging.exception('Failed to synchronize to disk.')

    return synchronized_data

In [3]:
def get_team_ids(episode_metadata):
    team_ids = {episode_metadata['result']['teams'][i]['id'] for i in range(len(episode_metadata['result']['teams']))}
    team_ids.update({agent['submission']['teamId'] for episode in episode_metadata['result']['episodes'] for agent in episode['agents']})
    return team_ids

In [4]:
def get_episode_ids(episode_metadata):
    episode_ids = {episode['id'] for episode in episode_metadata['result']['episodes']}
    return episode_ids

In [5]:
def save_json(filepath, json_data):
    with open(filepath, 'w') as f:
        json.dump(json_data, f)
        #f.write(json_data)

In [6]:
def get_episode_priority_map(episode_metadata):
    priority_map = dict()
    for episode in episode_metadata['result']['episodes']:
        episode_id = str(episode['id'])
        weights = [agent['updatedScore'] for agent in episode['agents'] if None is not agent['updatedScore']]
        priority = sum(weights)/len(weights) if 0 < len(weights) else 0
        priority_map[episode_id] = priority
    return priority_map

In [7]:
def get_team_priority_map(episode_metadata):
    priority_map = dict()
    for team in episode_metadata['result']['teams']:
        team_id = str(team['id'])
        priority = team['publicLeaderboardRank']
        priority_map[team_id] = priority
    return priority_map

In [8]:
def synchronize_priority_map(priority_map, filepath, stale_priorities=None):
    stale_priorities = stale_priorities if None is not stale_priorities else set()
    priority_map = synchronize_disk(filepath, priority_map)
    priority_map = {k:v for k,v in priority_map.items() if k not in stale_priorities}
    priority_map = synchronize_disk(filepath, priority_map, overwrite=True)
    return priority_map

In [9]:
def get_min_weighted_priority_queue(priority_map, watermark, request_budget=None):
    return get_priority_queue(priority_map, watermark, max_weighted=False, request_budget=request_budget)

def get_max_weighted_priority_queue(priority_map, watermark, request_budget=None):
    return get_priority_queue(priority_map, watermark, max_weighted=True, request_budget=request_budget)

In [10]:
def get_priority_queue(priorty_map, watermark, max_weighted=True, request_budget=None):
    prioritized_keys = sorted(priorty_map, key=priorty_map.get, reverse=max_weighted)
    watermark_fn = lambda x: (max_weighted and watermark <= x) or (not max_weighted and watermark >= x)
    watermarked_keys = [k for k in prioritized_keys if watermark_fn(priorty_map[k])]
    ratelimited_keys = watermarked_keys[:request_budget]
    return [(k, priorty_map[k]) for k in ratelimited_keys]

In [11]:
EPISODE_WATERMARK = 1100
TEAM_WATERMARK = 25
REQUEST_LIMIT =  10 #60*3 # must be smaller than 1000
REQUEST_DISCOVERY_BUDGET = 0.5
ARBITRARY_TEAM_ID = '5118174'

DOWNLOAD_FILEPATH = Path.cwd().joinpath('../data/raw/')
DOWNLOAD_FILEPATH.mkdir(parents=True, exist_ok=True)
METADATA_DIR = Path.cwd().joinpath('../scraper_metadata')
METADATA_DIR.mkdir(parents=True, exist_ok=True)
EPISODE_PMAP_FILEPATH = str(METADATA_DIR.joinpath('episode_priority_map.txt'))
TEAM_PMAP_FILEPATH = str(METADATA_DIR.joinpath('team_priority_map.txt'))
EPISODE_DOWNLOAD_FILEPATH = str(METADATA_DIR.joinpath('episode_downloads.txt'))

In [12]:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

In [13]:
# import inspect
# class ApiRequestLimit(object):
    
#     def __init__(self, api, request_limit):
#         class ProxyObj():
#             def __init__(self, parent):
#                 self._parent = parent
    
#             def _consume_request(self, fn, *args, **kwargs):
#                 if 0 >= self._parent.request_budget:
#                     raise Exception('Request budget exceeded')
#                 self._parent.request_budget = self._parent.request_budget - 1
#                 return fn(*args, **kwargs)
    
#         self._api = api
#         self.request_limit = request_limit
#         self.request_budget = request_limit
#         for name, value in inspect.getmembers(api):
#             is_subclass_instance = any([isinstance(value, subclass) for _, subclass in inspect.getmembers(api, inspect.isclass)])
#             if not hasattr(self, name) and is_subclass_instance:
#                 print('{}, {} is sub inst'.format(name, value))   
#                 obj = ProxyObj(self)
#                 for fn_name, fn in inspect.getmembers(value, callable):
#                     if not fn_name.startswith('_'):
#                         print('{}.{}'.format(fn_name, fn))
#                         setattr(obj, fn_name, lambda *args, **kwargs: obj._consume_request(fn, *args, **kwargs))
#                 setattr(self, name, obj)
                

In [14]:
# w = ApiRequestLimit(api, 10)

In [15]:
# w.request_budget

In [16]:
# w.episodes.team(ARBITRARY_TEAM_ID)

In [17]:
request_budget = REQUEST_LIMIT
teams_priorities = synchronize_disk(TEAM_PMAP_FILEPATH, dict())
episodes_priorities = synchronize_disk(EPISODE_PMAP_FILEPATH, dict())
scraped_episodes = synchronize_disk(EPISODE_DOWNLOAD_FILEPATH, set())
scraped_teams = set()

teams_queue = get_min_weighted_priority_queue(teams_priorities, TEAM_WATERMARK)
episodes_queue = get_max_weighted_priority_queue(teams_priorities, EPISODE_WATERMARK, request_budget)

api = KaggleClient()
request_counter = 0
request_start = time.time()

if 0 < len(teams_queue):
    logging.warn('Discovering episodes by prioritized team queue')
else:
    logging.info('Discovering episodes by arbitrary team_id {}'.format(ARBITRARY_TEAM_ID))
    teams_queue = [(ARBITRARY_TEAM_ID, -1)]

logging.info('Scraping started.')
is_scraping = True
while is_scraping:
    # scrape episodes
    while 0 < len(episodes_queue) and 0 < request_budget:
        episode_id, episode_priority = episodes_queue.pop(0)
        logging.info('Requesting replay for Episode ID {} with priority {}'.format(episode_id, episode_priority))

        # TODO: os.environ['DOWNLOAD_FILEPATH']
        filepath = str(DOWNLOAD_FILEPATH.joinpath('replay_EPISODEID_{}_{}.json'.format(episode_id, time.time())))
        response = api.replay.episode(episode_id)
        save_json(filepath, response)

        logging.info('Downloaded Episode ID {}'.format(episode_id))
        scraped_episodes.add(episode_id)
        episodes_priorities.pop(episode_id, None)
        
        request_budget = request_budget - 1
        request_counter = request_counter + 1
        if 60 <= request_counter:
            idle_time = 60 - (time.time() - request_start)
            if 0 < idle_time:
                logging.info('Idling for {} seconds'.format(idle_time))
                time.sleep(idle_time)
            request_start = time.time()
            request_counter = 0 

    # find episodes
    discovery_limit = (request_budget * REQUEST_DISCOVERY_BUDGET)//1
    discovery_budget = discovery_limit
    while 0 < discovery_budget and 0 < len(teams_queue):
        team_id, team_priority = teams_queue.pop(0)
        logging.info('Requesting metadata for Team ID {} with priority {}'.format(team_id, team_priority))

        # TODO: os.environ['DOWNLOAD_FILEPATH']
        filepath = str(DOWNLOAD_FILEPATH.joinpath('metadata_TEAMID_{}_{}.json'.format(team_id, time.time())))
        response = api.episodes.team(team_id)
        save_json(filepath, response)
        logging.info('Downloaded metadata by Team ID {}'.format(team_id))
        scraped_teams.add(team_id)

        teams_priorities.update(get_team_priority_map(response))
        # TODO: os.environ
        teams_priorities = synchronize_priority_map(teams_priorities, TEAM_PMAP_FILEPATH, scraped_teams)
        #logging.info('Updated teams pmap to {}'.format(teams_priorities))
        # TODO: os.environ
        teams_queue = get_min_weighted_priority_queue(teams_priorities, TEAM_WATERMARK)
        #logging.info('Updated teams queue to {}'.format(teams_queue))

        episodes_priorities.update(get_episode_priority_map(response))
        # TODO: os.environ
        episodes_priorities = synchronize_priority_map(episodes_priorities, EPISODE_PMAP_FILEPATH, scraped_episodes)
        # TODO: os.environ
        episodes_queue = get_max_weighted_priority_queue(episodes_priorities, EPISODE_WATERMARK, request_budget)
        discovery_budget = discovery_budget - 1
        
        request_counter = request_counter + 1
        if 60 <= request_counter:
            idle_time = 60 - (time.time() - request_start)
            if 0 < idle_time:
                logging.info('Idling for {} seconds'.format(idle_time))
                time.sleep(idle_time)
            request_start = time.time()
            request_counter = 0 
    
    request_budget = request_budget - discovery_limit + discovery_budget
    
    # update scrape state
    is_scraping = 0 < len(episodes_queue) and 0 < request_budget 

synchronize_disk(TEAM_PMAP_FILEPATH, teams_priorities, overwrite=True)
synchronize_disk(EPISODE_PMAP_FILEPATH, episodes_priorities, overwrite=True)
synchronize_disk(EPISODE_DOWNLOAD_FILEPATH, scraped_episodes, overwrite=True)
logging.info('Scraping finished')

  if __name__ == '__main__':
INFO:root:Discovering episodes by arbitrary team_id 5118174
INFO:root:Scraping started.
INFO:root:Requesting metadata for Team ID 5118174 with priority -1
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): www.kaggle.com:443
DEBUG:urllib3.connectionpool:https://www.kaggle.com:443 "POST /requests/EpisodeService/ListEpisodes?datatype=json HTTP/1.1" 200 None
INFO:root:Downloaded metadata by Team ID 5118174
INFO:root:Requesting metadata for Team ID 5118779 with priority 1
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): www.kaggle.com:443
DEBUG:urllib3.connectionpool:https://www.kaggle.com:443 "POST /requests/EpisodeService/ListEpisodes?datatype=json HTTP/1.1" 200 None
INFO:root:Downloaded metadata by Team ID 5118779
INFO:root:Requesting metadata for Team ID 5133228 with priority 2
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): www.kaggle.com:443
DEBUG:urllib3.connectionpool:https://www.kaggle.com:443 "POST /requests

In [18]:
# TODO: fix team rank