In [None]:
import pandas as pd 
import json
import csv
import nba_api
import aiohttp
import asyncio
import time
import requests
from aiohttp import ClientSession, TCPConnector
import random
import math
from contextlib import asynccontextmanager
import numpy as np
import itertools
import backoff_utils
from backoff_utils import backoff
from backoff_utils import apply_backoff
from backoff_utils import strategies
import backoff

In [None]:
# headers for the request
# used nba.com as an example
headers = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:100.0) Gecko/20100101 Firefox/100.0',
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'en-US,en;q=0.5',
    'Accept-Encoding': 'gzip, deflate, br',
    'x-nba-stats-origin': 'stats',
    'x-nba-stats-token': 'true',
    'Origin': 'https://www.nba.com',
    'DNT': '1',
    'Connection': 'keep-alive',
    'Referer': 'https://www.nba.com/',
    'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Site': 'same-site',
    'Pragma': 'no-cache',
    'Cache-Control': 'no-cache',
}

In [None]:
# rate limiter class for limiting the number of resquests per second
#max no of tokens is the rate limit
class RateLimiter:
    def __init__(self,rate_limit: int,concurrency_limit: int) -> None:
        #rate limit = no of request per second, rps
        self.rate_limit = rate_limit
        self.tokens_queue = asyncio.Queue(rate_limit)
        #consume tokens from queue at constant rate
        self.tokens_consumer_task = asyncio.create_task(self.consume_tokens())
        self.semaphore = asyncio.Semaphore(concurrency_limit)
    #
    async def add_token(self) -> None:
        await self.tokens_queue.put(1)
        return None
    async def consume_tokens(self):
        try:
            consumption_rate = 1 / self.rate_limit
            last_consumption_time = 0
            while True:
                if self.tokens_queue.empty():
                    await asyncio.sleep(consumption_rate)
                    continue

                current_consumption_time = time.monotonic()
                total_tokens = self.tokens_queue.qsize()
                tokens_to_consume = self.get_tokens_amount_to_consume(
                    consumption_rate,
                    current_consumption_time,
                    last_consumption_time,
                    total_tokens
                )
                for i in range(0, tokens_to_consume):
                    self.tokens_queue.get_nowait()

                last_consumption_time = time.monotonic()

                await asyncio.sleep(consumption_rate)
        except asyncio.CancelledError:
            # you can ignore the error here and deal with closing this task later but this is not advised
            raise
        except Exception as e:
            # do something with the error and re-raise
            raise
    @staticmethod
    def get_tokens_amount_to_consume(consumption_rate, current_consumption_time, last_consumption_time, total_tokens):
        #time btwn iteration
        time_from_last_consumption = current_consumption_time - last_consumption_time
        
        calculated_tokens_to_consume = math.floor(time_from_last_consumption / consumption_rate)
        tokens_to_consume = min(total_tokens, calculated_tokens_to_consume)
        return tokens_to_consume
    @asynccontextmanager
    async def throttle(self):
        await self.semaphore.acquire()
        await self.add_token()
        try:
            yield
        finally:
            self.semaphore.release()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            # log error here and safely close the class
            pass

        await self.close()

    async def close(self) -> None:
        if self.tokens_consumer_task and not self.tokens_consumer_task.cancelled():
            try:
                self.tokens_consumer_task.cancel()
                await self.tokens_consumer_task
            except asyncio.CancelledError:
                # we ignore this exception but it is good to log and signal the task was cancelled
                pass
            except Exception as e:
                # log here and deal with the exception
                raise

In [None]:
# determines the method of retrying failed requests and timeouts for quiting
@backoff.on_exception(backoff.expo,
                      aiohttp.ClientError, max_time=60 )  
@backoff.on_exception(backoff.expo,
                      asyncio.TimeoutError ,
                      max_time=300)   
# sends the actual requests   
async def send_request(lock,game_id,  client_session: aiohttp.ClientSession , rate_limiter: RateLimiter):
    start1= time.time()
    async with rate_limiter.throttle():
        # use the desired url here
        url = f'https://stats.nba.com/stats/boxscorefourfactorsv2?EndPeriod=1&EndRange=0&GameID={game_id}&RangeType=0&StartPeriod=1&StartRange=0'
        print(f'sending url: {url}')
        end1= time.time()-start1
        print(end1)
        start = time.time()
        timeout = aiohttp.ClientTimeout(total=None,connect=None,sock_connect=None, sock_read=None)
        response =await client_session.get(url, headers = headers, timeout=timeout)
        print(f'releasing throttler')
    # Why are the following lines not included in the rate limiter context?
    # because all we want to control is the rate of io operations
    # and since the following lines instruct reading the response stream into memory,
    # it shouldn't block the next requests from sending
    # (unless you have limited memory or large files to ingest.
    # In that case you should add it to the context 
    # but also make sure you free memory for the next requests)!
    # so we should now release the semaphore and let the
    # stream reading begin async while letting the rest of the requests go on sending
    print(f'reading stream of response from {url}')    
    roundtrip = time.time()-start
    print(roundtrip)
    status = response.status
    print(status)
    #retries the 504 code using expnl backoff algo
    if status == 504:
        response =  await client_session.get(url, headers = headers, timeout=timeout)
    else:
        pass
    data = await response.read()
    # hashrate is the json data
    hashrate = json.loads(data)
    response.json(content_type=None)
    response.release()
    return hashrate

# calls the ratelimiter class to be used with the send requests function
async def send_multiple_requests(lock):
    async with RateLimiter(rate_limit=9, concurrency_limit=10) as rate_limiter:
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            tasks = [asyncio.ensure_future(send_request(lock, client_session=session,rate_limiter=rate_limiter))]
            return await asyncio.gather(*tasks)

#lock is used to control the asynch loop
# providing the funtionanlity of stopping the loop on exception
async def main():
    lock= asyncio.Lock()
    return await send_multiple_requests(lock)
