In [1]:
%pip install nest_asyncio

import asyncio

import nest_asyncio


nest_asyncio.apply()

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.2.1 -> 23.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
# from airflow.decorators import task
import requests
import json
import os
import asyncio
from aiohttp import ClientSession

import pandas as pd
import numpy as np
from datetime import datetime



In [14]:

regional_base_uri = "https://sea.api.riotgames.com/"
local_base_uri = "https://ph2.api.riotgames.com/"

riot_tokens = [
    "RGAPI-ec7dde02-672d-47ea-b6aa-5b014e554c6b", #PBE
    "RGAPI-68dca2ec-bf5e-4c87-8bce-5dea4d53da72", #NA
    "RGAPI-ad062b7b-fd93-4a98-b656-729a176dd11d", #EUW
    "RGAPI-6d255cbf-5352-4090-94b1-68f1d9746f6f", #EUNE
    "RGAPI-b509d9b8-4900-4052-9162-051ed993cbea", #MAIN

]
default_header = {
   
}
currentTokenIndex = 0

def get_next_token_index() -> int:
    global currentTokenIndex
    if currentTokenIndex == len(riot_tokens) - 1:
        currentTokenIndex = 0
    else:
        currentTokenIndex+=1
    return currentTokenIndex

async def api_request(url, riot_token) -> dict:
    header = {
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Charset": "application/x-www-form-urlencoded; charset=UTF-8",
        "Origin": "https://developer.riotgames.com",
        "X-Riot-Token": riot_token,
    }
    async with ClientSession() as session:
        async with session.get(url, headers=header) as response:
            try:
                # print("done na with " + url + " with token " + header["X-Riot-Token"])
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', '1'))
                    print("Rate limited, retrying after " + str(retry_after) + " seconds")
                    await asyncio.sleep(retry_after)
                    return await api_request(url, header)
                assert response.status == 200
                return await response.json()
            except AssertionError:
                print("An error occurred: " + str(response.status) + " with token " + header["X-Riot-Token"])
                print(url + " " + str(await response.json()))
            

In [4]:

# @task(task_id="multiple_matches_ingest")
async def matches_request(matches: list[str], region_group : str) -> list[dict]:
    requests_list = []
    for match in matches:
        print("Fetching match: " + match)
        # match_uri = regional_base_uri + "lol/match/v5/matches/" + match
        match_uri = f"https://{region_group}.api.riotgames.com/lol/match/v5/matches/{match}"
        requests_list.append(api_request(match_uri, default_header))
    return await asyncio.gather(*requests_list)
    # print(responses)

async def league_entries_request(queue: str, tier :str, division:str, page:int , region : str) -> list[dict]:
    request = api_request(f"https://{region}.api.riotgames.com/lol/league-exp/v4/entries/{queue}/{tier}/{division}?page={page}", default_header)
    return await asyncio.gather(*[request])

async def top_league_entries_request(queue : str, tier:str, region : str) -> list[dict]:
    request = api_request(f"https://{region}.api.riotgames.com/lol/league/v4/{tier}/by-queue/{queue}", default_header)
    return await asyncio.gather(*[request])

async def players_request(players : list[tuple]): #By SummonnerID -> "id" in JSON file
    requests_list = []

    for region, summonerID in players:
        requests_list.append(api_request(f"https://{region}.api.riotgames.com/lol/summoner/v4/summoners/{summonerID}", default_header))
    
    semaphore = asyncio.Semaphore(20)
    async def semaphored_request(request):
        async with semaphore:
            return await request
    
    return await asyncio.gather(*(semaphored_request(request) for request in requests_list))

In [5]:
# @task(task_id="multiple_matches_ingest")
def matches_ingest(matches: list[str], region = "sea") -> list[dict]:
    return asyncio.run(matches_request(matches))

# @task(task_id="league_entries_ingest")
def league_entries_ingest(queue: str, tier :str, division:str, page:int = 1, region = "ph2") -> list[dict]:
    return asyncio.run(league_entries_request(queue, tier, division, page, region))[0]

# @task(task_id="top_league_entries_ingest")
def top_league_entries_ingest(queue: str, tier:str,region = "ph2") -> dict:
    return asyncio.run(top_league_entries_request(queue, tier, region))[0]

# @task(task_id="player_ingest")
def players_ingest(players : list[tuple]) -> list[dict]:
    return asyncio.run(players_request(players))



### Clean DataFrame

In [6]:
# @task(task_id="clean_dataframe_dtypes")
def clean_dataframe_dtypes(df: pd.DataFrame, categorical_columns = [], int_columns = [], datetime_columns = []) -> pd.DataFrame:
    for column in categorical_columns:
        df[column] = df[column].astype("category")
    for column in int_columns:
        df[column] = df[column].astype("int")
    for column in datetime_columns:
        df[column] = pd.to_datetime(df[column])
    return df

# @task(task_id="reorder_coluns")
def reorder_columns(df: pd.DataFrame, ordered_columns: list[str]) -> pd.DataFrame:
    return df.reindex(columns=ordered_columns)

# @task(task_id="clean_dataframe")
def clean_dataframe(df: pd.DataFrame, categorical_columns = [], int_columns = [], datetime_columns = [], ordered_columns = []) -> pd.DataFrame:
    df = clean_dataframe_dtypes(df, categorical_columns, int_columns, datetime_columns)
    if len(ordered_columns) > 0:
        df = reorder_columns(df, ordered_columns)
    return df

### Saving output to csv

In [7]:
# @task(task_id="save_to_csv")
def save_to_csv(df: pd.DataFrame, path: str, index: bool = False) -> None:
    """
        Note: Saving to CSV files are only temporary and will be removed after the entire piepline is completed.
    """
    if(os.path.exists(path)):
        df.to_csv(path, index=index, mode='a', header=False)
    else:
        df.to_csv(path, index=index, mode='w', header=True)

### League Entries

In [8]:
# @task(task_id="top_league_entries")
def top_league_entries(queue: str, tier :str, region):
    league_entries_raw : dict = top_league_entries_ingest(queue, tier, region)

    if not league_entries_raw:
        return
    
    current_time = datetime.now()

    player_league_infos = pd.DataFrame(league_entries_raw["entries"])
    player_league_infos["leagueId"] = league_entries_raw.get("leagueId")
    player_league_infos["last_updated"] = datetime.now()
    player_league_infos["region"] = region
    player_league_infos = reorder_columns(player_league_infos,
                                        ordered_columns= ["leagueId","region","summonerId","summonerName","leaguePoints","rank","wins","losses","veteran","inactive","freshBlood","hotStreak","last_updated"])
    save_to_csv(player_league_infos, "../../resources/datasets/player_league_infos.csv")

    del league_entries_raw["entries"]
    league_infos = pd.DataFrame([league_entries_raw])
    league_infos["last_updated"] = datetime.now()
    league_infos["queue"] = queue
    league_infos["region"] = region
    league_infos["division"] = 'I'
    league_infos = reorder_columns(league_infos,
                                        ordered_columns= ["leagueId","region","queue","tier","division","name","last_updated"])
    save_to_csv(league_infos, "../../resources/datasets/league_infos.csv")

# @task(task_id="league_entries")
def league_entries(queue: str, tier :str, division:str, region : str, pages : int):
    for page in range(1, pages+1):
        #League Infos and Player League Infos is separated
        league_entries_raw : dict = pd.DataFrame.from_dict(league_entries_ingest(queue, tier, division, page, region))

        if league_entries_raw.empty:
            return

        player_league_infos = league_entries_raw.loc[:,['leagueId','summonerId','summonerName','leaguePoints','rank','wins','losses','veteran','inactive','freshBlood','hotStreak']]
        player_league_infos["last_updated"] = datetime.now()
        player_league_infos["region"] = region
        player_league_infos = reorder_columns(player_league_infos,["leagueId","region","summonerId","summonerName","leaguePoints","rank","wins","losses","veteran","inactive","freshBlood","hotStreak","last_updated"])
        save_to_csv(player_league_infos, "../../resources/datasets/player_league_infos.csv")

        league_infos = league_entries_raw.loc[:,['leagueId','tier']]
        league_infos["region"] = region
        league_infos["queue"] = queue
        league_infos["division"] = league_entries_raw["rank"]
        league_infos["last_updated"] = datetime.now()
        league_infos = reorder_columns(league_infos,
                                        ordered_columns= ["leagueId","region","queue","tier","division","name","last_updated"])
        league_infos.drop_duplicates(subset=["leagueId"], inplace=True)
        save_to_csv(league_infos, "../../resources/datasets/league_infos.csv")

# @task(task_id="all_league_entries")
def all_league_entries(queues: list[str], tiers : list[str], divisions: list[str], regions : list[str], pages : int):
    """
        Main function for ingestion of data for leagues.  Data is only saved as a CSV file.
        Args:
            queue::str:
                The queue type of the league.  All valid values accepted according to API: RANK_FLEX_SR, RANK_FLEX_TT, RANKED_SOLO_5x5. Note that RANK_FLEX_TT is deprecated.
            tier::str:
                The league tier.  The following values are accepted: challengerleagues, grandmasterleagues, masterleagues, DIAMOND, PLATINUM, GOLD, SILVER, BRONZE, IRON
            division::str:
                The league division.  All valid values accepted according to API: I, II, III, IV.  Challenger to Master divisions only have one division.
            region::str:
                The region of the league.  All valid values accepted according to API: ph2, eun1, euw1, jp1, kr, la1, la2, na1, oc1, ru, sg2, th2, tr1, tw2, vn2
            pages::int:
                The number of pages to be ingested.  Each page contains >20 entries and varies per region. Challenger to Master divisions always show the complete list (only 1 page).  
        Returns:
            None
    """
    for queue in queues:
        for tier in tiers:
            for region in regions:
                if tier in ["challengerleagues", "grandmasterleagues", "masterleagues"]:
                    print(f"Queue: {queue}, Tier: {tier}, Region: {region}")
                    top_league_entries(queue=queue, tier=tier, region=region)
                else:
                    for division in divisions:
                        print(f"Queue: {queue}, Tier: {tier}, Division: {division}, Region: {region}")
                        league_entries(queue=queue, tier=tier, division=division, region=region, pages=pages)


### Player Details

In [None]:
player_ids_raw = pd.read_csv("../../resources/datasets/player_league_infos.csv", usecols = ["summonerId","region"]).sample(frac=1)
player_ids = list(player_ids_raw.itertuples(index=False, name=None))
display(len(player_ids))
test = players_ingest(player_ids)

    # "RGAPI-6d255cbf-5352-4090-94b1-68f1d9746f6f",
    # "RGAPI-b509d9b8-4900-4052-9162-051ed993cbea",
    # "RGAPI-ec7dde02-672d-47ea-b6aa-5b014e554c6b",
    # "RGAPI-68dca2ec-bf5e-4c87-8bce-5dea4d53da72",
    # "RGAPI-ad062b7b-fd93-4a98-b656-729a176dd11d",

### Match History   

## TEST ONLY

In [None]:
# TEST FILES
# print(league_entries_ingest("RANKED_SOLO_5x5", "DIAMOND", "I", 1))

# print(match_ingest("PH2_8088906")["metadata"])

# Test Top League Entries
queues = ["RANKED_SOLO_5x5", "RANKED_FLEX_SR"]
tiers = ["challengerleagues", "grandmasterleagues", "masterleagues" , "DIAMOND"]
divisions = ["I", "II", "III", "IV"]
regions = ["ph2", "eun1", "euw1", "jp1", "kr", "la1", "la2", "na1", "oc1", "ru", "sg2", "th2", "tr1", "tw2", "vn2"]

all_league_entries(queues=queues, tiers=tiers, divisions=divisions, regions=regions, pages=1)


Queue: RANKED_SOLO_5x5, Tier: challengerleagues, Region: ph2
Queue: RANKED_SOLO_5x5, Tier: challengerleagues, Region: eun1
An error occurred: 403 with token RGAPI-ad062b7b-fd93-4a98-b656-729a176dd11d
https://eun1.api.riotgames.com/lol/league/v4/challengerleagues/by-queue/RANKED_SOLO_5x5 {'Accept-Language': 'en-US,en;q=0.9', 'Accept-Charset': 'application/x-www-form-urlencoded; charset=UTF-8', 'Origin': 'https://developer.riotgames.com', 'X-Riot-Token': 'RGAPI-ad062b7b-fd93-4a98-b656-729a176dd11d'}
Queue: RANKED_SOLO_5x5, Tier: challengerleagues, Region: euw1


KeyboardInterrupt: 