In [1]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

from pyspark import SparkContext, SparkConf, SQLContext

from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import pandas_udf
from pyspark.sql.window import Window

import pandas as pd
import numpy as np

from datetime import date, datetime, timedelta, timezone
import os
import json
import requests
import time
import yaml
from itertools import chain

import concurrent.futures
from threading import Lock

In [2]:
appName = "PySpark TFT puuids"
master = "local[*]"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.executor.memory", "40g") \
    .set("spark.driver.memory", "40g") \
    .set("spark.executor.memoryOverhead", "8g") \
    .set("spark.local.dir", "/home/mai/spark-temp") \
    .set("spark.sql.session.timeZone", "UTC") \
    .set("spark.dynamicAllocation.enabled", "true") \
    .set("spark.dynamicAllocation.minExecutors", "2") \
    .set("spark.dynamicAllocation.maxExecutors", "50") \
    .set("spark.speculation", "true") 
   
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

25/02/17 23:10:31 WARN Utils: Your hostname, LAPTOP-4O0SI9BK resolves to a loopback address: 127.0.1.1; using 172.25.8.93 instead (on interface eth0)
25/02/17 23:10:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/17 23:10:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/17 23:10:32 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


## Set up

In [3]:
with open('./api_key.yaml', 'r') as file:
    API_KEY = yaml.safe_load(file)['api_key']

In [4]:
# API_KEY = 'RGAPI-b3773734-23bc-4d27-8b55-bed5ea5c103d'

## Get match data

In [5]:
REGION = 'kr'
REGION_TO_EXECUTE = 'ASIA'
BASE_URL = f'https://{REGION_TO_EXECUTE}.api.riotgames.com/tft/match/v1/matches'
TIER = 'GOLD'
DIVISION = 'I'

In [6]:
# # get the match id
# df = spark.read.parquet(f'./data/tft_matches_id/match_id_{REGION}_{TIER}_{DIVISION}.parquet')
# # exclude = spark.read.parquet(f'./data/tft_matches_id/match_id_{REGION}_CHALLENGER_{DIVISION}.parquet')

In [7]:
# df.show(5)

In [8]:
# # filter out the match ids already existing in challenger ranked players match history
# df = df.join(exclude, on=['match_id'], how='leftanti')

In [9]:
# df.count()

In [10]:
# match_ids = [row['match_id'] for row in df.select('match_id').collect()]

In [11]:
# len(match_ids)

In [12]:
# match_ids_1 = match_ids[:7000]
# match_ids_2 = match_ids[7000:14000]
# match_ids_3 = match_ids[14000:]

In [13]:
# # save into 2 parts
# pd.DataFrame(match_ids_1, columns=['match_id']).to_parquet(f'./data/tft_matches_id/match_id_{REGION}_{TIER}_{DIVISION}_p1.parquet', index=False)
# pd.DataFrame(match_ids_2, columns=['match_id']).to_parquet(f'./data/tft_matches_id/match_id_{REGION}_{TIER}_{DIVISION}_p2.parquet', index=False)
# pd.DataFrame(match_ids_3, columns=['match_id']).to_parquet(f'./data/tft_matches_id/match_id_{REGION}_{TIER}_{DIVISION}_p3.parquet', index=False)

In [14]:
match_ids_3 = pd.read_parquet(f'./data/tft_matches_id/match_id_{REGION}_{TIER}_{DIVISION}_p3.parquet')['match_id'].to_list()

In [15]:
len(match_ids_3)

7256

In [16]:
# Limits
MAX_WORKERS = 20
REQS_PER_SEC = 20
REQS_PER_2MIN = 100

In [17]:
# global counters
request_counter = 0
lock = Lock()          # ensure thread safety for counter updates

In [18]:
# Create a global session
session = requests.Session()
session.headers.update({"X-Riot-Token": API_KEY,
                       'Connection': 'keep-alive'})

In [19]:
def get_match_data(match_id, max_retries=5):
    global request_counter
    # construct url
    url = f'{BASE_URL}/{match_id}'

    for attempt in range(max_retries):
        try:
            response = session.get(url, timeout=10)
            
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:  # Rate limit exceeded
                retry_after = int(response.headers.get("Retry-After", 5))
                # print(f"Rate limit exceeded. Waiting {retry_after} seconds...")
                time.sleep(retry_after)
            else:
                print(f"Error {response.status_code}: {response.text}")
                return None
        except requests.exceptions.ConnectionError as e:
            # print(f"Connection error: {e}. Retrying in {2 ** attempt} seconds...")
            time.sleep(2 ** attempt)  # Exponential backoff

        # response = session.get(url)
        # if response.status_code == 200:
        #     # Update request counter safely
        #     with lock:
        #         request_counter += 1
        #     return response.json()
        # elif response.status_code == 429:   # rate limit hit
        #     retry_after = int(response.headers.get('Retry-After', 2))
        #     # print(f"Rate limit exceeded. Waiting {retry_after} seconds...")
        #     time.sleep(retry_after)
        # else:
        #     print(f"Error {response.status_code} for match {match_id}: {response.text}")
        #     return None  # Skip failed requests

In [20]:
def get_match_data_parallel(match_ids_):
    global request_counter
    match_data = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = []
        for i, match_id in enumerate(match_ids_):
            futures.append(executor.submit(get_match_data, match_id))  # Correct function call
            
            # Rate limit handling
            if (i + 1) % REQS_PER_SEC == 0:
                print(f"Reached {REQS_PER_SEC} requests, sleeping 1 second...")
                time.sleep(1)
            if (i + 1) % REQS_PER_2MIN == 0:
                print(f"Reached {REQS_PER_2MIN} requests, sleeping 120 seconds...")
                time.sleep(120)

        results = [future.result() for future in futures]  # Get results
    return [data for data in results if data is not None]  # Filter out failed requests

In [None]:
%%time
match_data = get_match_data_parallel(match_ids_3[3600:])

Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 100 requests, sleeping 120 seconds...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 100 requests, sleeping 120 seconds...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 100 requests, sleeping 120 seconds...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 20 requests, sleeping 1 second...
Reached 100 requests, 

In [None]:
len(match_data)

In [None]:
df = pd.json_normalize(match_data).explode('info.participants')

In [None]:
df.columns

In [None]:
df.head()

In [None]:
df[df['info.queueId'] == 1100].shape

In [None]:
# convert to parquet
match_df = spark.createDataFrame(df)

In [None]:
match_df.write.mode('overwrite').parquet(f'./data/tft_match_data/matches_{REGION}_{TIER}_{DIVISION}_p4.parquet')

In [None]:
df.to_parquet(f'./data/tft_match_data_pandas/matches_{REGION}_{TIER}_{DIVISION}_p4.parquet')