In [1]:
import os
import uuid
import json
import asyncio
import aiohttp
import requests
import pandas as pd

from sqlalchemy import create_engine
from time import perf_counter, sleep
from datetime import datetime, timedelta
from urllib.parse import quote_plus
from azure.cosmos import exceptions, CosmosClient, PartitionKey

import config_bestbuy

In [2]:
async def to_matrix(x, n):
    l = []
    for i in range(x):
        l.append(i+1)
    return [l[i:i+n] for i in range(0, len(l), n)]


async def ceiling_division(n, d):
    return -(n // -d)


async def insert_db(r, engine, db_cols, container, cursor_mark):

    cols = ['nextCursorMark', 'total', 'totalPages', 'queryTime', 'totalTime', 'canonicalUrl', 
            'sku', 'name', 'type', 'startDate', 'new', 'activeUpdateDate', 'active', 'regularPrice', 
            'salePrice', 'clearance', 'onSale', 'categoryPath', 'customerReviewCount', 'customerReviewAverage', 
            'priceUpdateDate', 'itemUpdateDate', 'class', 'classId', 'subclass', 'subclassId', 'department', 'departmentId', 
            'theatricalReleaseDate', 'studio', 'manufacturer', 'modelNumber', 'condition', 'artistName', 'images', 'image', 'color']
    bool_cols = ["new", "active", "clearance", "onSale"]
    int_cols = ["total", "totalPages"]
    float_cols = ["queryTime", "totalTime", "regularPrice", "salePrice", "customerReviewCount", "customerReviewAverage", 'theatricalReleaseDate']
    date_cols = ["startDate", "activeUpdateDate", "priceUpdateDate", "itemUpdateDate"]
    
    with engine.connect() as cnx:
        io = r.json()
        # for product in io['products']:
        #     product['cursorMark'] = cursor_mark
        #     product['id'] = str(uuid.uuid4())
        #     container.create_item(body=product)
        
        df_meta = pd.DataFrame(io)
        df_meta = df_meta.iloc[:, :-1]
        df_products = pd.DataFrame(io['products'])
        df = df_meta.merge(df_products, how='inner', left_index=True, right_index=True)
        df = df.loc[:, cols]
        df.insert(0, 'request_timestamp', datetime.utcnow())

        for col in df.columns.tolist():
            if col in bool_cols:
                df.loc[:, col] = df.loc[:, col].astype('bool')
            elif col in int_cols:
                df.loc[:, col] = df.loc[:, col].astype('int64')
            elif col in float_cols:
                df.loc[:, col] = df.loc[:, col].astype('float64')
            elif col in date_cols:
                df.loc[:, col] = pd.to_datetime(df.loc[:, col], errors='coerce', infer_datetime_format=True)
            else:
                df.loc[:, col] = df.loc[:, col].astype('str')
                
        df.to_sql(name='products', con=cnx, if_exists='append', index=False)


async def main(api_index=0, folder_index=0, page_size=100):

    t0 = perf_counter()
    # folderpath, datename, engine, db_cols, last_update_date, container = await initialize(folder_index=api_index)
    
    folders = ['products', 'categories', 'stores', 'products_update']
    datename = datetime.utcnow().strftime('%Y%m%d')
    cosmos_endpoint = config_bestbuy.bestbuy_cosmosdb_end_point
    cosmos_primary_key = config_bestbuy.bestbuy_cosmosdb_primary_key
    client = CosmosClient(cosmos_endpoint, cosmos_primary_key)
    db_name = 'BestBuyDB'
    database = client.create_database_if_not_exists(id=db_name)
    container_name = 'Products'
    container = database.create_container_if_not_exists(
        id=container_name,
        partition_key=PartitionKey(path='/department'),
        offer_throughput=400
    )
    """"""
    # path = config_bestbuy.path
    path = config_bestbuy.path_test
    foldername = f'best_buy_{datename}\\{folders[folder_index]}'
    folderpath = os.path.join(path, foldername)

    if not os.path.exists(folderpath):
        os.makedirs(folderpath)

    """"""
    # db = os.path.join(config_bestbuy.path, 'bestbuy.db')
    # conn_string = f'sqlite:///{db}'
    # engine = create_engine(conn_string)

    # # params = quote_plus(config_bestbuy.bestbuy_sql_odbc_string)
    # # engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

    # with engine.connect() as cnx:
    #     try:
    #         sel_stmt = "SELECT * FROM products LIMIT 0"
    #         df_db = pd.read_sql(sql=sel_stmt, con=cnx)
    #         db_cols = df_db.columns.tolist()

    #         last_update_stmt = 'SELECT MAX(itemUpdateDate) FROM products'
    #         df_itemUpdateDate = pd.read_sql(sql=last_update_stmt, con=cnx)
    #         last_update_date = df_itemUpdateDate.iloc[0, 0]
            
    #     except Exception as e:
    #         db_cols = []
    #         last_update_date = None
    

    async def api_bestbuy(page=1):
        chunk_size = 1024 # 64 * 2**10
        key = config_bestbuy.bestbuy_api_key
        apis = ['products', 'categories', 'stores'] #, f'products(itemUpdateDate>{last_update_date}&active=*)']
        url = f"https://api.bestbuy.com/v1/{apis[api_index]}"
        payload = {
            'apiKey': key, 
            'pageSize': page_size, 
            'format': 'json', 
            'show': 'all',
            'page': page
            }

        delay = page/5
        # delay = 0
        # await asyncio.sleep(delay)
        t1 = perf_counter()

        # sem = asyncio.Semaphore(5)
        connector = aiohttp.TCPConnector(limit=5)
        # await asyncio.sleep(1)
        async with aiohttp.ClientSession(connector=connector) as session:
            # async with sem:
            # async with sem, session.get(url, params=payload) as r:
            async with session.get(url, params=payload) as r:
                r.status
                data = await r.json(content_type=None) # content_type='text/html'
                pg = data.get('currentPage', 0)
                filename = f'best_buy_{datename}_{pg:05}.json'
                filepath = os.path.join(folderpath, filename)

                # with open(filepath, 'wb') as fd:
                #     # data = json.dumps(await r.json(), indent=4)
                #     async for chunk in r.content.iter_chunked(chunk_size):
                #         fd.write(chunk)

                with open(filepath, 'w') as f:
                    json.dump(data, f, indent=4)
            # insert_db(r=r, engine=engine, db_cols=db_cols, container=container, cursor_mark=nextcursorMark)
        
        t2 = perf_counter()
        print(f'{pg=}', f'{delay=}', f'{t2-t1=}', f'{t2-t0=}', sep=' | ')
        return data

    
    data = await api_bestbuy()
    # print(pages)
    pages = data.get('totalPages', 0)
    tasks = (api_bestbuy(page=page+1) for page in range(pages))
    await asyncio.gather(*tasks)
    # batches = await to_matrix(pages, 5)
    # tasks = [
    #     api_bestbuy(page=1), api_bestbuy(page=2), api_bestbuy(page=3), api_bestbuy(page=4), api_bestbuy(page=5),
    #     api_bestbuy(page=6), api_bestbuy(page=7), api_bestbuy(page=8), api_bestbuy(page=9), api_bestbuy(page=10)
    # ]
    # r = await api_bestbuy(page=pg, api_index=api_index, page_size=page_size, last_update_date=last_update_date)
        
    
    print(f'fin: {perf_counter()-t0=}')
    # return await asyncio.gather(*tasks)
    


In [3]:
if __name__ == '__main__':
    # [0: 'products', 1: 'categories', 2: 'stores', 3: f'products(itemUpdateDate>{last_update_date}&active=*)']
    # asyncio.run(main(api_index=0, folder_index=0, page_size=100))
    await main(api_index=0, folder_index=0, page_size=100)
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main(api_index=0, folder_index=0, page_size=100))
    # loop.close()
    

pg=1 | delay=0.2 | t2-t1=1.5262949999999993 | t2-t0=2.7082315999999995


ValueError: too many file descriptors in select()

Tip: To query for updates or deltas since you last walked through the result set you can use the itemUpdateDate attribute. To ensure that your query results include changes to a product’s active/inactive status, add active=* to your query parameters. 
For example: .../v1/products(itemUpdateDate>2017-02-06T16:00:00&active=*)?format=json&pageSize=100&cursorMark=*&apiKey=YOUR_API_KEY
For example: .../v1/products(itemUpdateDate>today&active=*)?format=json&pageSize=100&cursorMark=*&apiKey=YOUR_API_KEY
"https://api.bestbuy.com/v1/products(releaseDate>today)?format=json&show=sku,name,salePrice&apiKey=YourAPIKey"

In [None]:
async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))

In [None]:
https://stackoverflow.com/questions/47675410/python-asyncio-aiohttp-valueerror-too-many-file-descriptors-in-select-on-win
https://tutorialedge.net/python/concurrency/python-asyncio-semaphores-tutorial/
https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncio
https://stackoverflow.com/questions/56151929/aiohttp-with-asyncio-and-semaphores-returning-a-list-filled-with-nones
https://stackoverflow.com/questions/40836800/python-asyncio-semaphore-in-async-await-function
https://stackoverflow.com/questions/35196974/aiohttp-set-maximum-number-of-requests-per-second/43857526#43857526

In [16]:
Page = 1
api_index = 0
page_size = 100

r = api_bestbuy_test(pg=Page, api_index=api_index, page_size=page_size)
io = r.json()
io['id'] = str(uuid.uuid4())
io


{'from': 1,
 'to': 100,
 'currentPage': 1,
 'total': 224901,
 'totalPages': 2250,
 'queryTime': '0.033',
 'totalTime': '0.156',
 'partial': False,
 'canonicalUrl': '/v1/products?show=all&pageSize=100&format=json&apiKey=AwGV1zCqy6FDoQHNUoNcfjqA',
 'products': [{'sku': 1000006,
   'score': None,
   'productId': None,
   'name': 'Spy Kids: All the Time in the World [Includes Digital Copy] [Blu-ray] [2011]',
   'source': None,
   'type': 'Movie',
   'startDate': '2011-04-26',
   'new': False,
   'active': True,
   'lowPriceGuarantee': True,
   'activeUpdateDate': '2020-02-29T03:42:41',
   'regularPrice': 12.99,
   'salePrice': 12.99,
   'clearance': False,
   'onSale': False,
   'planPrice': None,
   'priceWithPlan': [],
   'contracts': [],
   'priceRestriction': None,
   'priceUpdateDate': '2020-02-29T03:42:41',
   'digital': False,
   'preowned': False,
   'carriers': [],
   'planFeatures': [],
   'devices': [],
   'carrierPlans': [],
   'technologyCode': None,
   'carrierModelNumber': N

In [9]:
cols = ['nextCursorMark', 'total', 'totalPages', 'queryTime', 'totalTime', 'canonicalUrl', 'sku', 'name', 'type', 'startDate', 'new', 'activeUpdateDate', 'active', 'regularPrice', 'salePrice', 'clearance', 'onSale', 'categoryPath', 'customerReviewCount', 'customerReviewAverage', 'priceUpdateDate', 'itemUpdateDate', 'class', 'classId', 'subclass', 'subclassId', 'department', 'departmentId', 'images', 'image', 'color']
io = r.json()
df_meta = pd.DataFrame(io)
df_meta = df_meta.iloc[:, :-1]
df_products = pd.DataFrame(io['products'])
df = df_meta.merge(df_products, how='inner', left_index=True, right_index=True)
df = df.loc[:, cols]
df.insert(0, 'request_timestamp', datetime.utcnow())

In [None]:
totalpages = set()
for _ in range(io['totalPages']):
    totalpages.add(_+1)

totalpages


In [2]:
config_bestbuy.bestbuy_sql_odbc_string

'Driver={ODBC Driver 17 for SQL Server};Server=tcp:bestbuy-server.database.windows.net,1433;Database=bestbuydb;Uid=user1;Pwd=b3$Tbuy1;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'

In [14]:

db = os.path.join(config_bestbuy.path, 'bestbuy.db')
conn_string = f'sqlite:///{db}'
engine = create_engine(conn_string)

# params = quote_plus(config_bestbuy.bestbuy_sql_odbc_string)
# engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")
    
with engine.connect() as cnx:
    df_test = pd.read_sql(sql="select color from products_archive", con=cnx)

for col in df_test.columns.tolist():
    print(col, df_test.loc[:, col].astype('str').apply(len).max(), sep= ' - ')


color - 50


In [13]:
def to_matrix(x, n):
    l = []
    for i in range(x):
        l.append(i+1)
    return [{_: l[i:i+n]} for _, i in enumerate(range(0, len(l), n))]


batches = to_matrix(2244, 5)

for i, batch in enumerate(batches):
    print(batch.get(i), "-")

[1, 2, 3, 4, 5] -
[6, 7, 8, 9, 10] -
[11, 12, 13, 14, 15] -
[16, 17, 18, 19, 20] -
[21, 22, 23, 24, 25] -
[26, 27, 28, 29, 30] -
[31, 32, 33, 34, 35] -
[36, 37, 38, 39, 40] -
[41, 42, 43, 44, 45] -
[46, 47, 48, 49, 50] -
[51, 52, 53, 54, 55] -
[56, 57, 58, 59, 60] -
[61, 62, 63, 64, 65] -
[66, 67, 68, 69, 70] -
[71, 72, 73, 74, 75] -
[76, 77, 78, 79, 80] -
[81, 82, 83, 84, 85] -
[86, 87, 88, 89, 90] -
[91, 92, 93, 94, 95] -
[96, 97, 98, 99, 100] -
[101, 102, 103, 104, 105] -
[106, 107, 108, 109, 110] -
[111, 112, 113, 114, 115] -
[116, 117, 118, 119, 120] -
[121, 122, 123, 124, 125] -
[126, 127, 128, 129, 130] -
[131, 132, 133, 134, 135] -
[136, 137, 138, 139, 140] -
[141, 142, 143, 144, 145] -
[146, 147, 148, 149, 150] -
[151, 152, 153, 154, 155] -
[156, 157, 158, 159, 160] -
[161, 162, 163, 164, 165] -
[166, 167, 168, 169, 170] -
[171, 172, 173, 174, 175] -
[176, 177, 178, 179, 180] -
[181, 182, 183, 184, 185] -
[186, 187, 188, 189, 190] -
[191, 192, 193, 194, 195] -
[196, 197, 198, 

In [231]:
today = datetime.utcnow().date()
n_days = datetime.utcnow().date() - timedelta(days=2)
n_hours = (today-n_days)/timedelta(seconds=1)
n_hours

172800.0