# 03 Fetch onchain data about projects

Grabs lots of data from differnet sources. Requires credentials and several GB storage. Not for the faint of heart.

In [1]:
from dotenv import load_dotenv
from google.cloud import bigquery
from dune_client.types import QueryParameter
from dune_client.client import DuneClient
from dune_client.query import QueryBase
import os
import pandas as pd
import re

In [2]:
load_dotenv()

PROJECT = 'opensource-observer'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "../../oso_gcp_credentials.json"
client = bigquery.Client()

DUNE_API_KEY = os.getenv('DUNE_API_KEY')
dune = DuneClient(DUNE_API_KEY)

In [3]:
REFRESH_DUNE = False
REFRESH_OSO = False
REFRESH_FARCASTER = False

In [4]:
EVENT_TABLES = {}
EVENT_COLS = ['uuid', 'chain', 'address', 'contract_type', 'user_address',
              'date', 'count_transactions', 'data_source']

In [5]:
DF_PROJECTS = pd.read_csv('data/apps/project_apps_labeled.csv', index_col=0)
DF_PROJECTS.groupby('contract_type')['chain'].value_counts()

contract_type       chain     
dapp_contract       Base          84
                    Optimism      40
                    Zora           9
                    Mode           8
factory_contract    Base          16
                    Optimism       5
                    Mode           1
invalid_contract    Base           3
                    Optimism       1
pending_cyber       Cyber          2
pending_kroma       Kroma          5
pending_lisk        Lisk           2
pending_mint        Mint          10
pending_orderly     Orderly        2
pending_polynomial  Polynomial     2
pending_redstone    Redstone       1
pending_swanchain   SwanChain      5
token_contract      Optimism      73
                    Base          32
trace_contract      Base          56
                    Optimism      22
                    Mode           3
                    Zora           1
unknown             Base          26
                    Optimism      24
                    Mode           2
       

In [6]:
def stringify_addresses(list_of_types, filter_col='contract_type', context='oso'):
    filtered_projects = DF_PROJECTS[DF_PROJECTS[filter_col].isin(list_of_types)]
    address_list = list(filtered_projects['address'].unique())
    if context == 'dune':
        address_list_str = ',\n\t\t'.join(address_list)
    elif context == 'oso':
        address_list_str = "'" + "','".join(address_list) + "'" 
    return address_list_str

## Part 1. Dune

In [7]:
def process_dune_events(dune_dataframe, data_source_name):

    def parse_string(row):
        matches = re.findall(r'\[(\d{4}-\d{2}-\d{2}) (\d+\.?\d*)\]', row)
        return [(date, float(value)) for date, value in matches]

    dune_dataframe_copy = dune_dataframe.copy()
    dune_dataframe_copy['transactions'] = dune_dataframe_copy['transaction_details'].apply(parse_string)

    projects_copy = DF_PROJECTS.reset_index().copy()
    projects_copy['lbl'] = projects_copy['chain'].str.lower() + ' ' + projects_copy['address']

    merge_settings = dict(left_on='lbl', right_on='contract_address', how='inner')
    merged_df = pd.merge(projects_copy, dune_dataframe_copy, **merge_settings)
    merged_df['transactions'] = merged_df['transactions'].apply(lambda x: [tuple(t) for t in x])
    exploded_df = merged_df.explode('transactions')
    exploded_df[['date', 'count_transactions']] = pd.DataFrame(
        exploded_df['transactions'].tolist(), index=exploded_df.index
    )
    exploded_df['data_source'] = data_source_name
    
    final_df = exploded_df[EVENT_COLS]
    print(f"Processed {len(final_df)} Dune transactions.")
    return final_df

In [8]:
address_list_str = stringify_addresses(['trace_contract', 'factory_contract'], context='dune')

query_sql = f"""
    with pre_agg_events as (
        select
            to_char(evt_block_time, 'yyyy-mm-dd') as block_date,
            concat(blockchain, ' ', cast("from" as varchar)) as contract_address,
            "to" as user_address,
            count(distinct evt_tx_hash) as count_transactions
        from evms.erc20_transfers
        where
            blockchain in ('base', 'optimism', 'zora')
            and evt_block_time between date('2024-02-01') and date('2024-09-01')
            and "from" in (
                    {address_list_str}
              )
        group by 1,2,3
    )
    select
      contract_address,
      user_address,
      array_agg(
        (block_date, count_transactions)
        order by
          block_date
      ) as transaction_details
    from
      pre_agg_events
    group by
      contract_address,
      user_address
"""


source_name = 'erc20_transfers'
if REFRESH_DUNE:
    query_id = dune.create_query(name=source_name, query_sql=query_sql, is_private=False)
    query = QueryBase(name=source_name, query_id=query_id.base.query_id)
    erc20_df = dune.run_query_dataframe(query)
    erc20_df.to_parquet(f'data/raw_metric_data/dune_raw_{source_name}.parquet')
else:
    erc20_df = pd.read_parquet(f'data/raw_metric_data/dune_raw_{source_name}.parquet')

print("Rows imported:", len(erc20_df))

EVENT_TABLES.update({
    source_name: process_dune_events(erc20_df, source_name)
})

2024-10-24 21:34:39,199 INFO dune_client.api.base executing 4199213 on medium cluster
2024-10-24 21:34:39,468 INFO dune_client.api.base waiting for query execution 01JB0MZD6GDKF5JFX82XDFPQSC to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:34:40,603 INFO dune_client.api.base waiting for query execution 01JB0MZD6GDKF5JFX82XDFPQSC to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:34:41,738 INFO dune_client.api.base waiting for query execution 01JB0MZD6GDKF5JFX82XDFPQSC to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:34:42,888 INFO dune_client.api.base waiting for query execution 01JB0MZD6GDKF5JFX82XDFPQSC to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:34:44,034 INFO dune_client.api.base waiting for query execution 01JB0MZD6GDKF5JFX82XDFPQSC to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:34:45,164 INFO dune_client.api.base waiting for query execution 01JB0MZD6GDKF5JFX82XDFPQSC to c

Rows imported: 473885
Processed 1203653 Dune transactions.


In [9]:
address_list_str = stringify_addresses(['token_contract'], context='dune')

query_sql = f"""
    with
      transfers as (
        select
          concat(blockchain, ' ', cast(contract_address as varchar)) as contract_address,
          to_char(block_date, 'yyyy-mm-dd') as block_date,
          tx_from,
          tx_to
        from
          tokens.transfers as transfers
        where
          contract_address in (
                {address_list_str}
          )
          and blockchain in ('base', 'optimism', 'zora')
          and block_date between date('2024-02-01') and date('2024-09-01')
      ),
      union_events as (
        select
          contract_address,
          block_date,
          tx_from as user_address,
          0.5 as amount
        from
          transfers
        union all
        select
          contract_address,
          block_date,
          tx_to as user_address,
          0.5 as amount
        from
          transfers
      ),
      pre_agg_events as (
        select
          contract_address,
          block_date,
          user_address,
          sum(amount) as count_transactions
        from
          union_events
        group by
          contract_address,
          block_date,
          user_address
      )
    select
      contract_address,
      user_address,
      array_agg(
        (block_date, count_transactions)
        order by
          block_date
      ) as transaction_details
    from
      pre_agg_events
    group by
      contract_address,
      user_address
"""

source_name = 'token_transfers'
if REFRESH_DUNE:
    query_id = dune.create_query(name=source_name, query_sql=query_sql, is_private=False)
    query = QueryBase(name=source_name, query_id=query_id.base.query_id)
    tokens_df = dune.run_query_dataframe(query)
    tokens_df.to_parquet(f'data/raw_metric_data/dune_raw_{source_name}.parquet')
else:
    tokens_df = pd.read_parquet(f'data/raw_metric_data/dune_raw_{source_name}.parquet')

print("Rows imported:", len(tokens_df))

EVENT_TABLES.update({
    source_name: process_dune_events(tokens_df, source_name)
})

2024-10-24 21:35:44,468 INFO dune_client.api.base executing 4199214 on medium cluster
2024-10-24 21:35:44,880 INFO dune_client.api.base waiting for query execution 01JB0N1CYAQXHP142EZH5QNY4S to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:35:46,028 INFO dune_client.api.base waiting for query execution 01JB0N1CYAQXHP142EZH5QNY4S to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:35:47,173 INFO dune_client.api.base waiting for query execution 01JB0N1CYAQXHP142EZH5QNY4S to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:35:48,308 INFO dune_client.api.base waiting for query execution 01JB0N1CYAQXHP142EZH5QNY4S to complete: ExecutionState.PENDING (queue position: 3)
2024-10-24 21:35:49,460 INFO dune_client.api.base waiting for query execution 01JB0N1CYAQXHP142EZH5QNY4S to complete: ExecutionState.EXECUTING
2024-10-24 21:35:50,611 INFO dune_client.api.base waiting for query execution 01JB0N1CYAQXHP142EZH5QNY4S to complete: Execution

Rows imported: 906501
Processed 2892876 Dune transactions.


## Part 2. Get OSO dapp transactions

In [10]:
def process_oso_events(oso_dataframe, data_source_name):

    projects_copy = DF_PROJECTS.reset_index().copy()

    merged_df = pd.merge(
        projects_copy,
        oso_dataframe,
        left_on=['address', 'chain'],
        right_on=['contract_address', 'chain'],
        how='inner'
    )
    merged_df['data_source'] = data_source_name
    
    final_df = merged_df[EVENT_COLS]
    print(f"Processed {len(final_df)} OSO transactions.")
    return final_df

In [11]:
address_list_str = stringify_addresses(['dapp_contract', 'trace_contract'], context='oso')

query = f"""
  select
      format_date('%Y-%m-%d', CAST(date AS DATE)) AS date,
      to_address as contract_address,
      from_address as user_address,
      chain,
      sum(transactions) as count_transactions
    from `{PROJECT}.static_data_sources.sunny_transactions`
    where
        to_address in ({address_list_str})
        and from_address is not null
    group by 1,2,3,4
    
"""

source_name = 'dapp_txns'
if REFRESH_OSO:
    result = client.query(query)
    print("Query completed.")
    txns_df = result.to_dataframe()
    print("Dataframe loaded.")
    txns_df.to_parquet(f'data/raw_metric_data/oso_raw_{source_name}.parquet')
else:
    txns_df = pd.read_parquet(f'data/raw_metric_data/oso_raw_{source_name}.parquet')
    
print("Rows imported:", len(txns_df))    

EVENT_TABLES.update({
    source_name: process_oso_events(txns_df, source_name)
})

Query completed.
Dataframe loaded.
Rows imported: 5161140
Processed 5732918 OSO transactions.


## Part 3: Get OSO trace events

In [12]:
address_list_str = stringify_addresses(['trace_contract', 'factory_contract'], context='oso')

query = f"""
    with traces as (
      select
        format_date('%Y-%m-%d', CAST(block_timestamp AS DATE)) AS date,
        transaction_hash,
        chain,
        to_address as contract_address,
        from_address as user_address
      from `{PROJECT}.static_data_sources.sunny_traces`
        where to_address in ({address_list_str})

    union all

      select
        format_date('%Y-%m-%d', CAST(block_timestamp AS DATE)) AS date,
        transaction_hash,
        chain,
        from_address as contract_address,
        to_address as user_address
      from `{PROJECT}.static_data_sources.sunny_traces`
        where from_address in ({address_list_str})
    )

    select 
      date,
      chain,
      contract_address,
      user_address,
      approx_count_distinct(transaction_hash) as count_transactions
    from traces
    group by 1,2,3,4
"""

source_name = 'trace_events'
if REFRESH_OSO:
    result = client.query(query)
    print("Query completed.")
    traces_df = result.to_dataframe()
    print("Dataframe loaded.")
    traces_df.to_parquet(f'data/raw_metric_data/oso_raw_{source_name}.parquet')
else:
    traces_df = pd.read_parquet(f'data/raw_metric_data/oso_raw_{source_name}.parquet')
    
print("Rows imported:", len(traces_df))    

EVENT_TABLES.update({
    source_name: process_oso_events(traces_df, source_name)
})

Query completed.
Dataframe loaded.
Rows imported: 11811659
Processed 11658453 OSO transactions.


## Part 4. Get metrics for factory contracts

In [13]:
address_list_str = stringify_addresses(['factory_contract'], context='oso')

query = f"""
    with factories as (
        select distinct
          contract_address,
          upper(network) as chain,
          factory_address
        from `{PROJECT}.oso.int_factories`
        where (factory_address in ({address_list_str}))
    )
    select
        format_date('%Y-%m-%d', CAST(t.date AS DATE)) AS date,
        f.factory_address as contract_address,
        t.to_address as deployed_contract_address,
        t.from_address as user_address,
        t.chain,
        sum(t.transactions) as count_transactions
    from `{PROJECT}.static_data_sources.sunny_transactions` as t
    join factories as f
        on t.to_address = f.contract_address
        and upper(t.chain) = f.chain
    group by 1,2,3,4,5
"""

source_name = 'factory_txns'
if REFRESH_OSO:
    result = client.query(query)
    print("Query completed.")
    factory_txns_df = result.to_dataframe()
    print("Dataframe loaded.")
    factory_txns_df.to_parquet(f'data/raw_metric_data/oso_raw_{source_name}.parquet')
else:
    factory_txns_df = pd.read_parquet(f'data/raw_metric_data/oso_raw_{source_name}.parquet')
    
print("Rows imported:", len(factory_txns_df))

EVENT_TABLES.update({
    source_name: process_oso_events(factory_txns_df, source_name)
})

Query completed.
Dataframe loaded.
Rows imported: 190953
Processed 192243 OSO transactions.


In [14]:
factory_deploys_df = (
    factory_txns_df
    .groupby(['deployed_contract_address', 'contract_address', 'chain'])
    ['date']
    .min()
    .reset_index()
)
factory_deploys_df.rename(columns={'deployed_contract_address': 'user_address'}, inplace=True)
factory_deploys_df['count_transactions'] = 1

EVENT_TABLES.update({
    'factory_deploys': process_oso_events(factory_deploys_df, 'factory_deploys')
})

Processed 9849 OSO transactions.


## Part 5. Consolidate and join on Farcaster data

In [15]:
query = f"""
    with profiles as (
      select
        v.fid,
        v.address,
        p.custody_address,
        json_value(p.data, "$.username") as username,
      from `{PROJECT}.farcaster.verifications` v
      join `{PROJECT}.farcaster.profiles` p
        on v.fid = p.fid
      where v.deleted_at is null
    ),
    unioned as (
      select
        fid,
        username,
        address
      from profiles
      where length(address) = 42
      union all
      select
        fid,
        username,
        custody_address as address
      from profiles
    )
    select distinct
      fid,
      username,
      address
    from unioned
"""

In [16]:
if REFRESH_FARCASTER:
    result = client.query(query)
    farcaster_df = result.to_dataframe()
    farcaster_df.to_parquet('data/raw_metric_data/farcaster.parquet')
else:
    farcaster_df = pd.read_parquet('data/raw_metric_data/farcaster.parquet')

farcaster_df.set_index('address', inplace=True)
farcaster_df.dropna(inplace=True)
fids = farcaster_df['fid'].to_dict()
fusers = farcaster_df['username'].to_dict()

In [17]:
df = pd.concat([table for table in EVENT_TABLES.values()], axis=0, ignore_index=True)

df['farcaster_id'] = df['user_address'].map(fids)
df['farcaster_username'] = df['user_address'].map(fusers)
df['recipient'] = df['uuid'].map(DF_PROJECTS['recipient'].to_dict())

df.tail(1)

Unnamed: 0,uuid,chain,address,contract_type,user_address,date,count_transactions,data_source,farcaster_id,farcaster_username,recipient
21689991,ceac6653-ea1d-43b8-9e02-8329e5ee2a90,Base,0x00000000000052068951aed201da868e29db48ac,factory_contract,0x6ab0a3035943f8932ec0c6fef25655d45715670b,2024-08-29,1.0,factory_deploys,,,0x3363b291a21cC692A5e07C9C63E3DF45F135EFcd


In [18]:
df.groupby('data_source')['contract_type'].value_counts()

data_source      contract_type   
dapp_txns        dapp_contract        4944924
                 trace_contract        787915
                 factory_contract          79
erc20_transfers  trace_contract       1203653
factory_deploys  factory_contract        9834
                 dapp_contract             15
factory_txns     factory_contract      191149
                 dapp_contract           1094
token_transfers  token_contract       2892876
trace_events     trace_contract      11570701
                 dapp_contract          59167
                 factory_contract       28585
Name: count, dtype: int64

In [19]:
df['data_source'].value_counts()

data_source
trace_events       11658453
dapp_txns           5732918
token_transfers     2892876
erc20_transfers     1203653
factory_txns         192243
factory_deploys        9849
Name: count, dtype: int64

In [20]:
df.to_parquet("data/raw_metric_data/project_events.parquet")