# Blockchain Usage ETL

In [1]:
import requests
import json
import datetime
import pandas as pd
import os
import json
from shroomdk import ShroomDK
from google.cloud import bigquery, bigquery_storage, storage
import math
import time
import yaml

with open('keys.json', 'r') as keys_file:
    keys = json.load(keys_file)
    flipside_key = keys["flipside_key"]
    chainbase_key = keys["chainbase_key"]

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "keys.json"

In [11]:
def get_dates(days_back):
    dt = (datetime.datetime.now() - datetime.timedelta(days = days_back)).strftime("%Y-%m-%d")
    mth_start = (datetime.datetime.now() - datetime.timedelta(days = days_back)).strftime("%Y-%m-01")
    #mth_end = (datetime.datetime.now() - datetime.timedelta(days = days_back)).strftime("%Y-%m-31")
    mth_end = (datetime.datetime.now() - datetime.timedelta(days = days_back)).strftime("%Y-%m-%d")
    yr_start = (datetime.datetime.now() - datetime.timedelta(days = days_back)).strftime("%Y-01-01")
    yr_end = (datetime.datetime.now() - datetime.timedelta(days = days_back)).strftime("%Y-12-31")
    dts = {
        "day": [dt, dt],
        "month": [mth_start, mth_end],
        "year": [yr_start, yr_end]
    }
    return dts

def query_flipside(q):
    # Created by https://twitter.com/0xdatawolf
    sdk = ShroomDK(flipside_key)    
    result_list = []
    for i in range(1,11): # max is a million rows @ 100k per page
        data=sdk.query(q,page_size=100000,page_number=i)
        if data.run_stats.record_count == 0:  
            break
        else:
            result_list.append(data.records)        
    result_df=pd.DataFrame()
    for idx, each_list in enumerate(result_list):
        if idx == 0:
            result_df=pd.json_normalize(each_list)
        else:
            result_df=pd.concat([result_df, pd.json_normalize(each_list)])
    return result_df

def query_chainbase(q):
    url = "https://api.chainbase.online/v1/dw/query"
    payload = {"query":q}
    headers = {"x-api-key": chainbase_key}
    response = requests.post(url, json=payload, headers=headers)
    task_id = response.json()["data"]["task_id"]
    row_ct = response.json()["data"]["rows"]
    all_results_df = pd.DataFrame([])
    row_ct = math.ceil(response.json()["data"]["rows"]/1000)
    if row_ct == 1:
        data = response.json()["data"]["result"]
        all_results_df = pd.json_normalize(data)
    else:
        time.sleep(2)
        for i in range(0,row_ct,1):
            payload = {"task_id":task_id, "page":i+1}
            response = requests.post(url, json=payload, headers=headers)
            data = response.json()["data"]["result"]
            results_df = pd.json_normalize(data)
            all_results_df = pd.concat([all_results_df,results_df])
            time.sleep(2)
    return all_results_df

def query_bigquery(q):
    connection = bigquery_storage.BigQueryReadClient()
    bqclient = bigquery.Client()
    df = bqclient.query(q).result().to_dataframe(bqstorage_client=connection)
    return df

def query_data(q, start, end, start_mth_dt, level):
    src = q["src"]
    query = q["query"].replace("{{start_dt}}",start).replace("{{end_dt}}",end).replace("{{start_mth_dt}}",start_mth_dt).replace("{{level}}",level)
    if src == 'flipside':
        df = query_flipside(query)
    elif src == 'chainbase':        
        df = query_chainbase(query) 
    elif src == 'bigquery':
        df = query_bigquery(query)                  
    else:
        raise Exception(f"{src} data source not supported")
    return df

def upload_file_gcp(source_file, bucket, destination_path):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket)
    blob = bucket.blob(destination_path)
    blob.upload_from_filename(source_file)
    print(f"File {source_file} uploaded to GCP storage at {destination_path}.")

def create_upload_delete_file(df, file_name_str, gcp_path):
    file_name = f"{file_name_str}_df.parquet"
    df['dt'] = df['dt'].astype("str")
    df.to_parquet(file_name, index=False)
    bucket = "primo_data"
    destination_path = f"{gcp_path}/{file_name}"
    upload_file_gcp(file_name, bucket, destination_path)
    os.remove(file_name)
    return True

def query_upload_data(dts, gcp_path, q, start_mth_dt='None', level='None'):
    start_dt = dts[0]
    end_dt = dts[1]
    query_df = query_data(q, start_dt, end_dt, start_mth_dt, level)
    create_upload_delete_file(query_df, start_dt, gcp_path)
    return query_df 

In [3]:
# Get Asset Labels (Polygon) from Chainbase
q = {
    "src":"chainbase",
    "query": """
    select distinct
        date(block_timestamp) dt
        , contract_address
        , case when is_erc20 = 1 then symbol else name end name
        , case when is_erc20 = 1 then 'token' when is_erc721 = 1 then 'nft' when is_erc1155 = 1 then 'erc1155' else 'unknown' end typ
    from polygon.token_metas
    where date(block_timestamp) >= date('{{start_dt}}') and date(block_timestamp) <= date('{{end_dt}}')
    """   
}
data_type = 'asset_labels/chain=polygon'

for i in range(1,3,1):
    dts = get_dates(days_back=i)['day']
    time.sleep(2)
    try:
        query_upload_data(dts, data_type, q)
    except:
        print(dts, "failed")
        continue

File 2023-01-01_df.parquet uploaded to GCP storage at asset_labels/chain=polygon/2023-01-01_df.parquet.
File 2022-12-31_df.parquet uploaded to GCP storage at asset_labels/chain=polygon/2022-12-31_df.parquet.


In [4]:
# Get Contract Labels (Polygon) from Flipside
today_dt = get_dates(days_back=0)['day'][0]
q = f"""
    select distinct address, address_name, project_name, label_type, label_subtype, '{today_dt}' dt
    from polygon.core.dim_labels
    where label_subtype not in ('token_contract','deposit_wallet')
    and label_type != 'chadmin'
    and address_name != '1inch network: general contract'
"""

df = query_flipside(q)
file_name = 'flipside'
data_type = 'contract_labels/chain=polygon'
create_upload_delete_file(df, file_name, data_type)

File flipside_df.parquet uploaded to GCP storage at contract_labels/chain=polygon/flipside_df.parquet.


True

In [5]:
# Get Asset Labels (Ethereum) from Chainbase
q = {
    "src":"chainbase",
    "query": """
    select distinct
        date(block_timestamp) dt
        , contract_address
        , case when is_erc20 = 1 then symbol else name end name
        , case when is_erc20 = 1 then 'token' when is_erc721 = 1 then 'nft' when is_erc1155 = 1 then 'erc1155' else 'unknown' end typ
    from ethereum.token_metas
    where date(block_timestamp) >= date('{{start_dt}}') and date(block_timestamp) <= date('{{end_dt}}')
    --where toStartOfMonth(date(block_timestamp)) = date('{{start_dt}}')
    """   
}
data_type = 'asset_labels/chain=ethereum'

for i in range(1,3,1):
    dts = get_dates(days_back=i)['day']
    time.sleep(2)
    try:
        query_upload_data(dts, data_type, q)
    except:
        print(dts, "failed")
        continue

File 2023-01-01_df.parquet uploaded to GCP storage at asset_labels/chain=ethereum/2023-01-01_df.parquet.
File 2022-12-31_df.parquet uploaded to GCP storage at asset_labels/chain=ethereum/2022-12-31_df.parquet.


In [6]:
# Get Contract Labels (Ethereum) from Flipside
today_dt = get_dates(days_back=0)['day'][0]
q = f"""
    select distinct address, address_name, '' project_name, label_type, label_subtype, '{today_dt}' dt
    from ethereum.core.dim_labels
    where label_subtype not in ('token_contract','deposit_wallet','hot_wallet','cold_wallet')
    and label_type not in ('chadmin','cex')
    and address_name not in ('1inch: general contract','mev bot: general contract','unibright: general contract','nest protocol: general contract','gnosis safe: general contract','loopring: general contract')
    and label_subtype != 'nf_token_contract'
"""

df = query_flipside(q)
file_name = 'flipside'
data_type = 'contract_labels/chain=ethereum'
create_upload_delete_file(df, file_name, data_type)

File flipside_df.parquet uploaded to GCP storage at contract_labels/chain=ethereum/flipside_df.parquet.


True

In [9]:
# Get Blockchain Usage Data - Daily
# Read queries files
with open("queries.yaml") as f:
    queries_dict = yaml.safe_load(f)

# Loop thru dates
for dt in range(1,2,1):
    dts = get_dates(days_back=dt)["day"]
    start_mth_dt = get_dates(days_back=dt)['month'][0]
    print(dts, start_mth_dt)
    
    # Loop thru queries
    for i in queries_dict:
        for q in queries_dict[i]:
            data_type = f"{i}/level=daily/chain={q['chain']}"
            query_upload_data(dts, data_type, q, start_mth_dt, "day")

['2023-01-01', '2023-01-01'] 2023-01-01
File 2023-01-01_df.parquet uploaded to GCP storage at agg/level=daily/chain=solana/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg/level=daily/chain=polygon/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg/level=daily/chain=bitcoin/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg/level=daily/chain=ethereum/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg_apps/level=daily/chain=solana/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg_apps/level=daily/chain=polygon/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg_apps/level=daily/chain=bitcoin/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg_apps/level=daily/chain=ethereum/2023-01-01_df.parquet.
File 2023-01-01_df.parquet uploaded to GCP storage at agg_assets/level=daily/chain=s

In [12]:
# Get Blockchain Usage Data - Monthly
# Read queries files
with open("queries.yaml") as f:
    queries_dict = yaml.safe_load(f)

# Loop thru dates
for dt in range(1,2,1):
    dts = get_dates(days_back=dt)["month"]
    start_mth_dt = get_dates(days_back=dt)['month'][0]
    print(dts, start_mth_dt)
    
    # Loop thru queries
    for i in queries_dict:
        for q in queries_dict[i]:
            data_type = f"{i}/level=monthly/chain={q['chain']}"
            query_upload_data(dts, data_type, q, start_mth_dt, "month")

['2023-01-01', '2023-01-01'] 2023-01-01


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=5d05cfab-1e73-4d91-af1a-d7a1738132a0' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>