In [12]:
import sys
import pymongo
import psycopg2
import json

print(f'Python Version: {sys.version}')
print(f'Pymongo Version: {pymongo.__version__}')
print(f'Psycopg2 Version: {psycopg2.__version__}')
print(f'Json Version: {json.__version__}')

Python Version: 3.11.2 (tags/v3.11.2:878ead1, Feb  7 2023, 16:38:35) [MSC v.1934 64 bit (AMD64)]
Pymongo Version: 4.6.0
Psycopg2 Version: 2.9.9 (dt dec pq3 ext lo64)
Json Version: 2.0.9


In [13]:
url = "mongodb://etlReaderAnalysis:etl_reader_analysis__Gr2rEVBXyPWzIrP@34.126.84.83:27017,34.142.204.61:27017,34.142.219.60:27017"
blocks_and_transaction_mongo_db_client = pymongo.MongoClient(url)

In [14]:
url = "mongodb://klgReaderAnalysis:klgReaderAnalysis_4Lc4kjBs5yykHHbZ@35.198.222.97:27017,34.124.133.164:27017,34.124.205.24:27017"
knowledge_graph_mongo_db_client = pymongo.MongoClient(url)

In [15]:
transferring_events_postgresql_connection = psycopg2.connect(dbname="postgres", user="student_token_transfer", password="svbk_2023", host="34.126.75.56", port="5432")

General settings being used throughout the notebook

In [16]:
SETTINGS = {
    'BLOCKCHAINS': ['ethereum_blockchain_etl', 'blockchain_etl'],
    'BLOCKCHAIN_TO_CHAIN_MAP': {
        'ethereum_blockchain_etl': 'chain_0x1',
        'blockchain_etl': 'chain_0x38'
    },
    'MIN_BALANCE_IN_USD' : 250_000,
    'WALLET_LIMIT': 10
}

Get native token price change log and store it in a json file

In [17]:
query = { '_id' : '0x1_0x0000000000000000000000000000000000000000' }

knowledge_graph_db = knowledge_graph_mongo_db_client['knowledge_graph']

price_change_logs_native_ethereum = knowledge_graph_db.smart_contracts.find(query)[0]
price_change_logs_native_ethereum = price_change_logs_native_ethereum['priceChangeLogs']

with open('./data/native_token_price_change_logs.json', 'w') as file:
    json.dump(price_change_logs_native_ethereum, file)

Get all the incoming transactions for a specified wallet_address. 

Only query transcations from in SETTINGS.BLOCKCHAINS defined blockchains

In [18]:
def getIncomingTransactionsForWalletAddress(wallet_address: str) -> map: 
    query = {
        'to_address': wallet_address
    }
    blockchain_transaction_mapping = {}
    for blockchain in SETTINGS['BLOCKCHAINS']:
        blockchain_db = blocks_and_transaction_mongo_db_client[blockchain]
        transactions = list(blockchain_db.transactions.find(query))
        blockchain_transaction_mapping[blockchain] = transactions
    return blockchain_transaction_mapping

Get all the outgoing transactions for a specified wallet_address. 

Only query transcations from in SETTINGS.BLOCKCHAINS defined blockchains

In [19]:
def getOutgoingTransactionsForWalletAddress(wallet_address: str) -> map: 
    query = {
        'from_address': wallet_address
    }
    blockchain_transaction_mapping = {}
    for blockchain in SETTINGS['BLOCKCHAINS']:
        blockchain_db = blocks_and_transaction_mongo_db_client[blockchain]
        transactions = list(blockchain_db.transactions.find(query))
        blockchain_transaction_mapping[blockchain] = transactions
    return blockchain_transaction_mapping

Main wallet crawling

Amount of wallets being crawled is defined in SETTINGS.WALLET_LIMIT
Min threshold for balance in USD is defined in SETTINGS.MIN_BALANCE_IN_USD

All crawled wallets are being stored in a json file with their corresponding outgoing and incoming transactions 

In [20]:
ALL_TRANSACTION_HASHES = {
    'ethereum_blockchain_etl': [],
    'blockchain_etl': []
}
WALLET_ADDRESSES_QUERIES = []
ALL_WALLETS = {}

query = {'balanceInUSD': {'$gt': SETTINGS['MIN_BALANCE_IN_USD']}, 'dailyAllTransactions': {'$exists':  "true"}, 'balanceChangeLogs': {'$exists':  "true"}}

knowledge_graph_db = knowledge_graph_mongo_db_client['knowledge_graph']

print('Crawling wallets with transaction information')

for wallet in knowledge_graph_db.wallets.find(query).limit(SETTINGS['WALLET_LIMIT']):
    wallet_address = wallet['address']
    WALLET_ADDRESSES_QUERIES.append(wallet_address)

    wallet['incoming_transactions'] = getIncomingTransactionsForWalletAddress(wallet_address)

    for blockchain in wallet['incoming_transactions']:
        for ts in wallet['incoming_transactions'][blockchain]:
            ALL_TRANSACTION_HASHES[blockchain].append(ts['hash'])
    wallet['outgoing_transactions'] = getOutgoingTransactionsForWalletAddress(wallet_address)

    for blockchain in wallet['outgoing_transactions']:
        for ts in wallet['outgoing_transactions'][blockchain]:
            ALL_TRANSACTION_HASHES[blockchain].append(ts['hash'])

    ALL_WALLETS[wallet_address] = wallet

with open(f'./data/wallets.json', 'w') as file:
        json.dump(ALL_WALLETS, file)

print(f'Queried wallets: {WALLET_ADDRESSES_QUERIES}')
print('Finished crawling wallets with transaction information')

Crawling wallets with transaction information
Queried wallets: ['0xdba746b5533cf9d65c7140d12b09a973521f7561', '0xff36ab2d61487ac65111df3eab862e34647f76e0', '0xf604298e63e97783378dfdabdb55a7d791ede89a', '0xd0bd97286c41f638a68e8b088ba0feb00b90b4a2', '0xeff295c6d55e7c51e155aac5ee9788bf63f3c2c0', '0xb5b8ef26cda3ab4a50a372da93596c546b25b7f8', '0xff1827e2a90d62801b842a9d3b95a21902495a7d', '0xc0e2830724c946a6748ddfe09753613cd38f6767', '0xe67faab7a523c467e214c170abbffba8fdf57afc', '0xecb84a0d981d05c04ed62712aa60f3545a8cbea9']
Finished crawling wallets with transaction information


For all transcations being crawled get corresponding transferring events and store them in a json file

In [21]:
print(f'Crawling transferring events for transactions in: {ALL_TRANSACTION_HASHES}')

ALL_TRANSFERRING_EVENTS = {
    'ethereum_blockchain_etl': [],
    'blockchain_etl': []
}
ALL_SMART_CONTRACTS_ADDRESSES = []

for blockchain in SETTINGS['BLOCKCHAINS']: 
    if len(ALL_TRANSACTION_HASHES[blockchain]) == 0: 
        continue
    chainID = SETTINGS['BLOCKCHAIN_TO_CHAIN_MAP'][blockchain]
    transferring_events_query = f"SELECT * FROM {chainID}.token_transfer WHERE transaction_hash IN %s"
    cursor = transferring_events_postgresql_connection.cursor()
    cursor.execute(transferring_events_query, (tuple(ALL_TRANSACTION_HASHES[blockchain]),))
    result = cursor.fetchall()
    ALL_SMART_CONTRACTS_ADDRESSES = ALL_SMART_CONTRACTS_ADDRESSES + [ item[0] for item in result ]
    ALL_TRANSFERRING_EVENTS[blockchain] = result

with open(f'./data/transferring_events.json', 'a') as file:
    json.dump(ALL_TRANSFERRING_EVENTS, file)

print(f'Queried transferring events: {ALL_TRANSFERRING_EVENTS}')
print('Finished crawling transferring events')

Crawling transferring events for transactions in: {'ethereum_blockchain_etl': ['0xf06a19e6e027ddcee2212823ff3f98cc7c8b936f1b9d9a311b32a18319c29626', '0x109e12a560da658e1c30cb37fd75c8caf0997bb05dd43a3fc842c1fc4d79a1c0', '0x9e8cf9a50730374fc3beefe2799ce9f047663b0e7d327131f44031a4c39f4c12', '0x97af43fdc2403fe3c85e5212718defa4a964b29607f7954cdacc87ed5598ea83', '0xc950f969085432fbdda557a98a5cf00b04764e3ae3735358960a30b8a10535a0', '0x31a1961511c6a8f98961d0effedeac25bd387c97b4445f33bff9e39251efb9a7', '0xd78d4ef8541a7da758a4088b2d14a7f0640f84102379c04460221a86ec823206', '0xa7a200dacc591077940a334a2d46f13d24b658d09f7dd852bb52487d308cf56c', '0x4a85bfa546f587d563e83ebf763f7765467729b62e93c95865ef94690efd3fec', '0x2a9b6e40035c4434659c04eda6a2c360dd2d99798f5d021b2d5ba6ede30a6a32', '0x3a40540903eb9f47cf159a6fbfd245a52f38e39f0d87a527423acc10d51fa909', '0x7184539fcd8e065a6ec7a4d8917a943f68963e1bbd5fbf66cf524fc827f3e4ba', '0xde9cdd9e0f25e56038b450ef64f09504ce3b6ec6077d2c2d61b130c4465cafb1', '0x77538e33

For each transferring events being crawled get corresponding smart contracts and store them in a json file

In [22]:
print(f'Crawling smart contracts for smart contract addresses in: {ALL_SMART_CONTRACTS_ADDRESSES}')

knowledge_graph_db = knowledge_graph_mongo_db_client['knowledge_graph']

query = {
    'address': { '$in': ALL_SMART_CONTRACTS_ADDRESSES }
}

smart_contracts = list(knowledge_graph_db.smart_contracts.find(query))
smart_contracts = {
    'smart_contracts': smart_contracts
}
with open('./data/smart_contracts.json', 'a') as file:
    json.dump(smart_contracts, file)   

print(f'Queried smart contracts: {smart_contracts}')
print('Finished crawling smart contracts')

Crawling smart contracts for smart contract addresses in: ['0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x9fa184c43b00da59b06f2296d509fbb465fb362e', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0013db53133774a2af2f708', '0x3800d898880f3fdbb0