In [101]:
from dotenv import load_dotenv
load_dotenv() 
import os
alchemy_key = os.environ["ALCHEMY"]
import keyring

import requests
import time
import sys
import traceback
import json
import pandas as pd
import sqlalchemy
import uuid
import urllib

conn_string = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=dev-web3.database.windows.net;DATABASE=sap;UID=web3_admin;PWD=' + keyring.get_password("sql-dev-web3", "web3_admin") + ";"
url = f'mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(conn_string)}'
engine = sqlalchemy.create_engine(url, fast_executemany=True)

def create_df():
    df = pd.DataFrame(columns = ['blockNum', 'uniqueId', 'hash', 'from', 'to', 'value', 'erc721TokenId', 'erc1155Metadata', 'tokenId', 'asset', 'category', 'rawContract_value', 'rawContract_address', 'rawContract_decimal'])
    return df

## method that calls an API and retries it xx times in case of issues
def api_call(method:str, url:str, headers=None, payload=None, sleeper=2, retries=20):
    retry_counter = 0
    interupt = False

    while True:
        try:
            response = requests.request(method=method, url=url, headers=headers, data=payload)
            if response.status_code == 200:
                break
            else:
                retry_counter += 1
                if retry_counter <= retries:
                    print("retry API call #" + str(retry_counter) + " with: " + url)
                    time.sleep(retry_counter * sleeper)
                else:
                    print("retrying failed more than " + str(retries) + " times - start over")
                    return False
        except KeyboardInterrupt:
            interupt = True
            break        
        except:
            print('request issue, will retry with: ' + url)
            time.sleep(retry_counter * sleeper)
    
    if interupt == True:
        print("Execution ended successfully in api_call")
        sys.exit()
    else:
        return json.loads(response.text)

## iterate over api pagination
def api_load():
    contractAddresses = ['0x463de2a5c6e8bb0c87f4aa80a02689e6680f72c7', '0x6362364a37f34d39a1f4993fb595dab4116daf0d', '0x239b912cf695da50ad19e0c05337067e2eb55fe3', 
    '0x68e65cc375f10baf74ac41773658dd00b5de1eaa', '0x65d96f0d45606016e30c97ee039775de9722a7d2', '0x5a75fa233a06cfcf708549785598b428340b9261', '0xf855acf79501609d982e4793f588dff69fc4c7e6', 
    '0x05a28540de2869281fe8a39882fbadc96ec4766c', '0x88da4c6bffdaedf6b64bdb6060973f86a77830ec', '0x6f0e738ea607f3a6698a813d45849dc36f7f3328', '0x354e2edcfbcd2f3b5fdce8cad3730408c052804e', 
    '0x38c518400a3f9e2110dc52e6c92e37fb7378aaaf', '0xc42cb8538e0dea6459c5965876131120ba96122e', '0x0044c5a5a6f626b673224a3c0d71e851ad3d5153', '0xb00110cc12cdc8f666f33f4e52e4957ff594282f', 
    '0x65e217285f8c3bf615784c72e5874aa8ac35ef9d', '0xabfd6760151c7f9361cc5b03bbeebe2d7c0251da', '0x0f310489a6eb1eeefc815ff3f0574c88a4aaaa06', '0xc84403a38b90f1db216db398beb9c8acb3ad7d3b', 
    '0x80ea96d75a308144708570a8e84f50df5477ee8a', '0x62896f42cf1371b268db56e50d67c34f3eb1ad7a', '0x0bccab36f518f55e00f3efe2e828ae63cd2ac1b9', '0xf0b3aed0232b3c51693323e45878c9173b6c43fe', 
    '0x261bef4b19ace1398c6603ed7299296d0e32cc00', '0x54a34c14db1093eb6e602b83db9ecc8cb929ba5c', '0x7b6c20ab3aed15f8c695978282dc0a30093bec97', '0x04943c19896c776c78770429ec02c5384ee78292', 
    '0xa96c8e571b23a6cfc8ca6955c5d3ff03a13fa699', '0xe47838cb5874da9b8a40107abbf4edf75a7e7ba0', '0xeaa9938076748d7edd4df0721b3e3fe4077349d3', '0xa8cc5bde3134f73e25e5fa386d0e9c68ae3c77e6', 
    '0xb8802c009dd265b38e320214a7720ebd7a488827', '0x3b28baae3987502b436f6f37d1bcd7b87b517b27']

    url = f"https://polygon-mainnet.g.alchemy.com/v2/{alchemy_key}"
    payload_txt = {
        "id": 1,
        "jsonrpc": "2.0",
        "method": "alchemy_getAssetTransfers",
        "params": [
            {
                "fromBlock": "0x0",
                "toBlock": "latest",
                "contractAddresses": contractAddresses,
                "category": [
                        "erc20"
                ],
                "withMetadata": False,
                "excludeZeroValue": True,
                "maxCount": "0x3e8"
            }
        ]
    }
    payload = json.dumps(payload_txt)
    headers = {'Content-Type': 'text/plain'}

    df = create_df()
    pageKey = ''

    while True:
        try:
            if pageKey != '':
                payload_txt['params'][0]['pageKey'] = pageKey
                payload = json.dumps(payload_txt)

            response_json = api_call(method="POST", url=url, headers=headers, payload=payload)
            if type(response_json) == bool:
                print("Some API load issue with url: " + url)
                return False

            dfTemp = pd.json_normalize(response_json['result'], record_path='transfers', sep='_')
            df = pd.concat([df,dfTemp])

            if 'pageKey' in response_json['result']:
                pageKey = response_json['result']['pageKey']
                time.sleep(0.2)
            else:
                break
            
        except KeyboardInterrupt:
            print("Execution ended successfully")
            sys.exit()
        except requests.exceptions.RequestException as e:
            print("just a Request error, will retry in 2s...")
            time.sleep(2)
            continue
        except Exception as e:
            print("unexpected error")
            print(e)
            print(traceback.format_exc())
            return False
    return df
 
def transform(df:pd.DataFrame):
    df.drop(['erc721TokenId', 'erc1155Metadata', 'tokenId','rawContract_value','rawContract_decimal'], axis=1, inplace=True)

    int_columns = ["value"]    
    df[int_columns] = df[int_columns].apply(pd.to_numeric)

    df.rename(columns= {'rawContract_address':'contractAddress', 'from':'fromAddress', 'to':'toAddress'}, inplace=True)
    return df

def upsert_df( df: pd.DataFrame, table_name: str, primary_key_col: str):
    if df.shape[0] > 0:    
        # get datetime columns 
        date_cols = list(df.select_dtypes(include=['datetime64[ns, UTC]']).columns)
        dtype = {col : sqlalchemy.sql.sqltypes.DateTime for col in date_cols}

        table_name_to_transfer = f"temp_{uuid.uuid4().hex[:6]}"

        # replace the placeholder table with the dataframe
        df.to_sql(table_name_to_transfer, engine, if_exists='replace', index=False, dtype=dtype)

        # building the command terms
        cols_list = df.columns.tolist()
        cols_list_query = f'({(", ".join(cols_list))})'
        sr_cols_list = [f'Source.{i}' for i in cols_list]
        sr_cols_list_query = f'({(", ".join(sr_cols_list))})'
        up_cols_list = [f'{i}=Source.{i}' for i in cols_list]
        up_cols_list_query = f'{", ".join(up_cols_list)}'
            
        # fill values that should be interpreted as "NULL" with None
        def fill_null(vals: list) -> list:
            def bad(val):
                if isinstance(val, type(pd.NA)):
                    return True
                # the list of values you want to interpret as 'NULL' should be 
                # tweaked to your needs
                return val in ['NULL', 'nan', '', '', '-', '?']
            return tuple(i if not bad(i) else None for i in vals)

        # create the list of parameter indicators (?, ?, ?, etc...)
        # and the parameters, which are the values to be inserted
        params = [fill_null(row.tolist()) for _, row in df.iterrows()]
        param_slots = '('+', '.join(['?']*len(df.columns))+')'
            
        cmd = f'''
            MERGE {table_name} as Target
            USING {table_name_to_transfer} AS Source
            ON Target.{primary_key_col}=Source.{primary_key_col}
            
            WHEN NOT MATCHED BY Target THEN
            INSERT {cols_list_query} VALUES {sr_cols_list_query}

            WHEN MATCHED THEN
            UPDATE SET {up_cols_list_query};
            '''

        # execute the command to merge tables
        with engine.begin() as conn:
            conn.execute(cmd)

        engine.execute(f'DROP TABLE "{table_name_to_transfer}"')
    return True

def upsert(df: pd.DataFrame, table_name: str, primary_key_col: str):
    if df.shape[0] > 0:
        if df.shape[0] > 10000:
            print("Batch upload necessary")
            total_length = df.shape[0]
            batch_start = 0
            while batch_start < total_length:
                batch_end = batch_start + 10000
                upsert_df(df=df[batch_start:batch_end], table_name=table_name, primary_key_col=primary_key_col)
                print("Batch " + str(batch_end))
                batch_start = batch_end
        else:
            upsert_df(df=df, table_name=table_name, primary_key_col=primary_key_col)
    return df.shape[0]

def run():
    df = False
    while type(df) == bool:
        df = api_load()

    if df.shape[0] == 0:
        return None
    else:
        df = transform(df)
        loaded = upsert(df, table_name='toucan_nct' , primary_key_col='uniqueId')
        return loaded
        # return df

In [103]:
run()

ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near the keyword 'from'. (156) (SQLExecDirectW)")
[SQL: 
            MERGE toucan_nct as Target
            USING temp_4a0d7e AS Source
            ON Target.uniqueId=Source.uniqueId
            
            WHEN NOT MATCHED BY Target THEN
            INSERT (blockNum, uniqueId, hash, from, to, value, asset, category, contractAddress) VALUES (Source.blockNum, Source.uniqueId, Source.hash, Source.from, Source.to, Source.value, Source.asset, Source.category, Source.contractAddress)

            WHEN MATCHED THEN
            UPDATE SET blockNum=Source.blockNum, uniqueId=Source.uniqueId, hash=Source.hash, from=Source.from, to=Source.to, value=Source.value, asset=Source.asset, category=Source.category, contractAddress=Source.contractAddress;
            ]
(Background on this error at: https://sqlalche.me/e/14/f405)

In [100]:
df

Unnamed: 0,blockNum,uniqueId,hash,from,to,value,asset,category,contractAddress
0,0x1341787,0x391c0000bd48d605f83c7e6165438ec69a903efc509d...,0x391c0000bd48d605f83c7e6165438ec69a903efc509d...,0x0000000000000000000000000000000000000000,0x5e037e2f5c92dc9e180426171b62da65d3ad5325,100.0,TCO2-VCS-674-2014,erc20,0x62896f42cf1371b268db56e50d67c34f3eb1ad7a
1,0x13462aa,0x4cd4ed0c5186146343df108f691f6ad30cc4650f44e9...,0x4cd4ed0c5186146343df108f691f6ad30cc4650f44e9...,0x0000000000000000000000000000000000000000,0xaf529400ff068afbdc907c699b9184a4381481b0,3000.0,TCO2-VCS-1162-2015,erc20,0x261bef4b19ace1398c6603ed7299296d0e32cc00
2,0x134638a,0x7fe7d37c3d4e94cd30bbe0e0287d73978b27500f311a...,0x7fe7d37c3d4e94cd30bbe0e0287d73978b27500f311a...,0x0000000000000000000000000000000000000000,0xaf529400ff068afbdc907c699b9184a4381481b0,7000.0,TCO2-VCS-1162-2015,erc20,0x261bef4b19ace1398c6603ed7299296d0e32cc00
3,0x134639a,0xde6e943a1930fd79afefeb8f916160a02b931fe0804b...,0xde6e943a1930fd79afefeb8f916160a02b931fe0804b...,0x0000000000000000000000000000000000000000,0xaf529400ff068afbdc907c699b9184a4381481b0,9174.0,TCO2-VCS-1577-2013,erc20,0x38c518400a3f9e2110dc52e6c92e37fb7378aaaf
4,0x13463a5,0xa5001abc6732d725ca65908f66c665a4f4a78f14a441...,0xa5001abc6732d725ca65908f66c665a4f4a78f14a441...,0x0000000000000000000000000000000000000000,0xaf529400ff068afbdc907c699b9184a4381481b0,10000.0,TCO2-VCS-1577-2015,erc20,0x04943c19896c776c78770429ec02c5384ee78292
...,...,...,...,...,...,...,...,...,...
602,0x1ed1967,0xbf271102c90bf4c23dd51488ac8a91839ce2d5225850...,0xbf271102c90bf4c23dd51488ac8a91839ce2d5225850...,0xd838290e877e0188a4a44700463419ed96c16107,0xcefb61af5325c0c100cbd77eb4c9f51d17b189ca,10.4,TCO2-VCS-1052-2012,erc20,0x463de2a5c6e8bb0c87f4aa80a02689e6680f72c7
603,0x1ed1967,0xbf271102c90bf4c23dd51488ac8a91839ce2d5225850...,0xbf271102c90bf4c23dd51488ac8a91839ce2d5225850...,0xcefb61af5325c0c100cbd77eb4c9f51d17b189ca,0x0000000000000000000000000000000000000000,10.4,TCO2-VCS-1052-2012,erc20,0x463de2a5c6e8bb0c87f4aa80a02689e6680f72c7
604,0x1ee6efe,0x78551ebf115296903bf2cb8d4adf9f6b8e7ec1f017dd...,0x78551ebf115296903bf2cb8d4adf9f6b8e7ec1f017dd...,0xd838290e877e0188a4a44700463419ed96c16107,0xcefb61af5325c0c100cbd77eb4c9f51d17b189ca,1.0,TCO2-VCS-1052-2012,erc20,0x463de2a5c6e8bb0c87f4aa80a02689e6680f72c7
605,0x1ee6efe,0x78551ebf115296903bf2cb8d4adf9f6b8e7ec1f017dd...,0x78551ebf115296903bf2cb8d4adf9f6b8e7ec1f017dd...,0xcefb61af5325c0c100cbd77eb4c9f51d17b189ca,0x0000000000000000000000000000000000000000,1.0,TCO2-VCS-1052-2012,erc20,0x463de2a5c6e8bb0c87f4aa80a02689e6680f72c7


In [38]:
if 'pageKey' in response_json['result']:
    print('yes')
else:
    print('no')

no


In [27]:
response_json

{'jsonrpc': '2.0',
 'id': 1,
 'result': {'transfers': [{'blockNum': '0x163317a',
    'uniqueId': '0x8239c98641de618ae509880df7c543f7e1de8cb8147fa525551955f7ae789ace:log:373',
    'hash': '0x8239c98641de618ae509880df7c543f7e1de8cb8147fa525551955f7ae789ace',
    'from': '0x0000000000000000000000000000000000000000',
    'to': '0xc4652c97ae04a8cf6541c76cb96d9a7684ea1a8c',
    'value': 89225.00000000001,
    'erc721TokenId': None,
    'erc1155Metadata': None,
    'tokenId': None,
    'asset': 'TCO2-VCS-191-2008',
    'category': 'erc20',
    'rawContract': {'value': '0x12e4e59dfdb084840000',
     'address': '0xb139c4cc9d20a3618e9a2268d73eff18c496b991',
     'decimal': '0x12'}},
   {'blockNum': '0x163323c',
    'uniqueId': '0x6a043edc750424ed1d1fcfa54dabdf91cbf8aaffd78c4d9d3f4f5f744f881eee:log:455',
    'hash': '0x6a043edc750424ed1d1fcfa54dabdf91cbf8aaffd78c4d9d3f4f5f744f881eee',
    'from': '0xc4652c97ae04a8cf6541c76cb96d9a7684ea1a8c',
    'to': '0x2f800db0fdb5223b3c3f354886d907a671414a7f',