Documentation
1. The server is at 34.133.34.19:80
2. Both Arb and Mainnet data are stored in the logs


In [7]:
from nozzle.client import Client
from nozzle.util import to_hex
import os.path
import time
from nozzle.util import Abi

import time
import pandas as pd

client_url = "grpc://34.133.34.19:80"
client = Client(client_url)

def elapsed(start):
    return round(time.time() - start, 4) 
def process_query(client, query):
    # df = pd.DataFrame()

    print(query)
    start = time.time()

    result_stream = client.get_sql(query)
    print('time to establish stream: ', elapsed(start), 's')

    total_events = 0
    try:
        batch = next(result_stream)
        print('time for first batch ', elapsed(start), 's')
        total_events += batch.num_rows
        df = batch.to_pandas().map(to_hex)

        print('The type of df is ', type(df))
        
        batch_start = time.time()

        for batch in result_stream:
            total_events += batch.num_rows
            print('received batch of ', batch.num_rows, ' rows in ', elapsed(batch_start), 's')
            batch_start = time.time()
            new_df = batch.to_pandas().map(to_hex)
            # Concatenate the df dataframe to the previous dataframe
            df = pd.concat([df, new_df])


        print('total rows: ', total_events)
        print('total time to consume the stream: ', elapsed(start), 's')
        print('rows/s: ', total_events / elapsed(start))
        return df 

    except StopIteration:
        print("No more batches available in the result stream.")

### Addresses and contracts of ABIs on Mainnet
 https://thegraph.com/docs/en/network/contracts/


In [2]:
GNS_address = 'aDcA0dd4729c8BA3aCf3E99F3A9f471EF37b6825'
GNS_path = 'abis/l1gns.json'
GNSAbi = Abi(GNS_path)

L1Staking_address = 'F55041E37E12cD407ad00CE2910B8269B01263b9'
L1Staking_path =  "abis/L1Staking.json"
L1StakingAbi = Abi(L1Staking_path)


IStakingStitched_address = 'F55041E37E12cD407ad00CE2910B8269B01263b9'
IStakingStitched_path = "abis/IStakingStitched.json"
IStakingStitchedAbi = Abi(IStakingStitched_path)

RewardsManager_address ='9ac758ab77733b4150a901ebd659cbf8cb93ed66⁠'
RewardsManager_path = "abis/RewardsManager.json"
RewardsManagerAbi = Abi(RewardsManager_path)

ServiceRegistry_address = 'aD0C9DaCf1e515615b0581c8D7E295E296Ec26E6' 
ServiceRegistry_path =  "abis/ServiceRegistry.json"
ServiceRegistryAbi = Abi(ServiceRegistry_path)

StakingExtension_address = 'A479c00cDa8C07bce458D7a826C7b091672EB92C'
StakingExtension_path = "abis/StakingExtension.json"

GraphTokenLockManager_address = ''
StakingExtension_path = 'abis/GraphTokenLockManager.json'

IGraphToken_address = 'c944E90C64B2c07662A292be6244BDf05Cda44a7'
IGraphToken_path = 'abis/IGraphToken.json'
IGraphTokenAbi = Abi(IGraphToken_path)

IEthereumDIDRegistry_address = 'dCa7EF03e98e0DC2B855bE647C39ABe984fcF21B'
IEthereumDIDRegistry_path = 'abis/IEthereumDIDRegistry.json'
IEthereumDIDRegistryAbi = Abi(IEthereumDIDRegistry_path)


### Entity Indexer

#### Step 1: Identify relevant contract events

In [3]:

ServiceRegistered_sig = ServiceRegistryAbi.events["ServiceRegistered"].signature()
StakeDeposited_sig = IStakingStitchedAbi.events["StakeDeposited"].signature()
StakeDelegated_sig = IStakingStitchedAbi.events["StakeDelegated"].signature()
RebateClaimed_sig = IStakingStitchedAbi.events["RebateClaimed"].signature()
IndexerStakeTransferredToL2_sig = L1StakingAbi.events["IndexerStakeTransferredToL2"].signature()
StakeDelegatedLocked_sig = IStakingStitchedAbi.events["StakeDelegatedLocked"].signature()
RewardsAssigned_sig = RewardsManagerAbi.events["RewardsAssigned"].signature()
RebateCollected_sig = IStakingStitchedAbi.events["RebateCollected"].signature()
StakeLocked_sig = IStakingStitchedAbi.events["StakeLocked"].signature()
AllocationCreated_sig = IStakingStitchedAbi.events["AllocationCreated"].signature()
StakeSlashed_sig = IStakingStitchedAbi.events["StakeSlashed"].signature()
StakeWithdrawn_sig = IStakingStitchedAbi.events["StakeWithdrawn"].signature()



### Step 2: Create logic to each attribute

#### indexer_part1_query
- ✓ id: ETH address of indexer
- ✓ geoHash: (geo_hash) "Geohash of the indexer. Shows where their indexer is located in the world"
- ✓ totalAllocations: (total_allocation_count)  "All allocations of stake for this Indexer (i.e. closed and active)" 
Need to check how are these calculated in the analytics subgraph, I am getting very different numbers here
- queryFeesCollected: (query_fees_collected) "Total query fees collected. Includes the portion given to delegators"
- queryFeeRebates: (query_fee_rebates)  "Query fee rebate amount claimed from the protocol through rebates mechanism. Does not include portion given to delegators

#### indexer_part2_query
- ✓ stakedTokens:(staked_tokens) "CURRENT tokens staked in the protocol. Decreases on withdraw, not on lock"
- delegatedtokens:(delegated_tokens)  "CURRENT tokens delegated to the indexer”
- lockedTokens: (locked_tokens) "CURRENT tokens locked”
- rewardsEarned: (rewards_earned)"Total indexing rewards earned by this indexer from inflation. Including delegation rewards"

### Step 3: Consolidate them into a final table

In [9]:
indexer_query = f"""
WITH indexer_geohash AS (
    SELECT 
        pc.block_number,
        pc.ServiceRegistered['indexer'] AS id,
        pc.ServiceRegistered['geohash'] AS geo_hash
    FROM (
        SELECT 
            l.block_num AS block_number,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}') AS ServiceRegistered
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{ServiceRegistry_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{ServiceRegistered_sig}')
    ) pc
    INNER JOIN (
        SELECT 
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}')['indexer'] AS indexer,
            MAX(l.block_num) AS max_block_number
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{ServiceRegistry_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{ServiceRegistered_sig}')
        GROUP BY 
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}')['indexer']
    ) latest
    ON pc.ServiceRegistered['indexer'] = latest.indexer AND pc.block_number = latest.max_block_number
),

allocations_1 AS (
    SELECT  
        pc.block_num AS block_number,
        pc.AllocationCreated['metadata'] AS metadata,
        pc.AllocationCreated['allocationID'] AS allocationID,
        pc.AllocationCreated['tokens'] AS tokens,
        pc.AllocationCreated['epoch'] AS epoch,
        pc.AllocationCreated['subgraphDeploymentID'] AS subgraphDeploymentID,
        pc.AllocationCreated['indexer'] AS indexer
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationCreated_sig}') AS AllocationCreated
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{AllocationCreated_sig}')
    ) pc
),

allocations_2 AS (
    SELECT 
        indexer,
        COUNT(allocationID) AS total_allocation_count
    FROM allocations_1
    GROUP BY indexer
),

rebateClaimed AS (
    SELECT 
        pc1.block_num AS block_number,
        pc1.RebateClaimed['indexer'] AS indexer,
        pc1.RebateClaimed['tokens'] AS queryFeeRebates
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateClaimed_sig}') AS RebateClaimed
        FROM  
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{RebateClaimed_sig}')
    ) pc1
),

rebateCollected AS (
    SELECT 
        pc2.block_num AS block_number,
        pc2.RebateCollected['indexer'] AS indexer,
        pc2.RebateCollected['queryFees'] AS queryFeesCollected,
        pc2.RebateCollected['queryRebates'] AS queryFeeRebates
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateCollected_sig}') AS RebateCollected
        FROM  
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{RebateCollected_sig}')
    ) pc2
),

summedRebateClaimed AS (
    SELECT
        indexer,
        SUM(queryFeeRebates) AS totalQueryFeeRebates
    FROM rebateClaimed
    GROUP BY indexer
),

summedRebateCollected AS (
    SELECT
        indexer,
        SUM(queryFeeRebates) AS totalQueryFeeRebates,
        SUM(queryFeesCollected) AS totalQueryFeesCollected
    FROM rebateCollected
    GROUP BY indexer
),

queryfees AS (
    SELECT 
        COALESCE(rc.indexer, rcl.indexer) AS indexer,
        COALESCE(rc.totalQueryFeeRebates, 0) AS query_fee_rebates,
        COALESCE(rcl.totalQueryFeesCollected, 0) AS query_fees_collected
    FROM summedRebateClaimed rc
    FULL OUTER JOIN summedRebateCollected rcl ON rc.indexer = rcl.indexer
),

stakedTokens_1 AS (
    SELECT 
        indexer,
        SUM(tokens) AS stakedTokens
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.StakeDeposited['indexer'] AS indexer,
            pc.StakeDeposited['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDeposited_sig}') AS StakeDeposited
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeDeposited_sig}')
        ) pc
    ) a 
    GROUP BY indexer
),

stakedTokens_2 AS (
    SELECT 
        indexer,
        -SUM(transferredStakeTokens) AS stakedTokens
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS transferredStakeTokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{IndexerStakeTransferredToL2_sig}') AS IndexerStakeTransferredToL2
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{L1Staking_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{IndexerStakeTransferredToL2_sig}')
        ) pc
    ) a 
    GROUP BY indexer
),

stakedTokens_3 AS (
    SELECT 
        indexer,
        -SUM(tokens) AS stakedTokens
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.StakeSlashed['indexer'] AS indexer,
            pc.StakeSlashed['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeSlashed_sig}') AS StakeSlashed
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeSlashed_sig}')
        ) pc
    ) a 
    GROUP BY indexer
),

stakedTokens_final AS (
    SELECT 
        indexer, 
        SUM(stakedTokens) AS staked_tokens 
    FROM (
        SELECT * FROM stakedTokens_1
        UNION ALL
        SELECT * FROM stakedTokens_2
        UNION ALL  
        SELECT * FROM stakedTokens_3
    ) a
    GROUP BY indexer
),

delegated_tokens_1 AS (
    SELECT 
        indexer,
        SUM(tokens) AS tokens
    FROM (
        SELECT 
            pc.StakeDelegated['indexer'] AS indexer,
            pc.StakeDelegated['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDelegated_sig}') AS StakeDelegated
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeDelegated_sig}')
        ) pc
    ) a 
    GROUP BY indexer
),

delegated_tokens_3 AS (
    SELECT 
        indexer, 
        -SUM(tokens) AS tokens
    FROM (
        SELECT  
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{IndexerStakeTransferredToL2_sig}') AS IndexerStakeTransferredToL2
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{L1Staking_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{IndexerStakeTransferredToL2_sig}')
        ) pc
    ) 
    GROUP BY indexer
),

delegated_tokens_4 AS (
    SELECT 
        indexer, 
        -SUM(tokens) AS tokens
    FROM (
        SELECT  
            pc.StakeDelegatedLocked['indexer'] AS indexer,
            pc.StakeDelegatedLocked['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDelegatedLocked_sig}') AS StakeDelegatedLocked
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeDelegatedLocked_sig}')
        ) pc
    ) 
    GROUP BY indexer
),

delegated_tokens_5 AS (
    SELECT 
        indexer, 
        SUM(tokens) AS tokens
    FROM (
        SELECT 
            pc2.RebateCollected['indexer'] AS indexer,
            pc2.RebateCollected['delegationRewards'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateCollected_sig}') AS RebateCollected
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{RebateCollected_sig}')
        ) pc2
    ) 
    GROUP BY indexer
),

final_delegated_tokens AS (
    SELECT * FROM delegated_tokens_1
    UNION ALL 
    SELECT * FROM delegated_tokens_3
    UNION ALL 
    SELECT * FROM delegated_tokens_4
    UNION ALL 
    SELECT * FROM delegated_tokens_5
),

delegation_tokens AS (
    SELECT 
        indexer, 
        SUM(tokens) AS delegated_tokens
    FROM final_delegated_tokens 
    GROUP BY indexer
),

ranked_tokens AS (
    SELECT  
        pc.block_num AS block_number,
        pc.StakeLocked['indexer'] AS indexer,
        pc.StakeLocked['tokens'] AS tokens,
        RANK() OVER (PARTITION BY pc.StakeLocked['indexer'] ORDER BY pc.block_num DESC) AS rn
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeLocked_sig}') AS StakeLocked
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{StakeLocked_sig}')
    ) pc
),

locked_tokens AS (
    SELECT 
        block_number,
        indexer,
        tokens AS lockedTokens
    FROM ranked_tokens
    WHERE rn = 1
),

locked_tokens_withdraw AS (
    SELECT 
        indexer, 
        -SUM(tokens) AS locked_tokens_withdraw
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.StakeWithdrawn['indexer'] AS indexer,
            pc.StakeWithdrawn['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeWithdrawn_sig}') AS StakeWithdrawn
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeWithdrawn_sig}')
        ) pc
        LEFT JOIN locked_tokens lt ON pc.StakeWithdrawn['indexer'] = lt.indexer  
        WHERE pc.block_num >= lt.block_number
    ) a
    GROUP BY indexer
),

locked_tokens_final AS (
    SELECT 
        a.indexer,
        a.lockedTokens - b.locked_tokens_withdraw AS locked_tokens
    FROM 
        locked_tokens a 
    LEFT JOIN 
        locked_tokens_withdraw b ON a.indexer = b.indexer
)

SELECT 
    a.*,
    b.total_allocation_count,
    d.query_fees_collected AS query_fees_collected,
    d.query_fee_rebates,
    e.staked_tokens,
    f.delegated_tokens,
    g.locked_tokens
FROM indexer_geohash a 
LEFT JOIN allocations_2 b ON a.id = b.indexer
LEFT JOIN queryfees d ON a.id = d.indexer
LEFT JOIN stakedTokens_final e ON a.id = e.indexer
LEFT JOIN delegation_tokens f ON a.id = f.indexer
LEFT JOIN locked_tokens_final g ON a.id = g.indexer
"""

indexer_1 = process_query(client, indexer_query)




WITH indexer_geohash AS (
    SELECT 
        pc.block_number,
        pc.ServiceRegistered['indexer'] AS id,
        pc.ServiceRegistered['geohash'] AS geo_hash
    FROM (
        SELECT 
            l.block_num AS block_number,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'ServiceRegistered(address indexed indexer,string url,string geohash)') AS ServiceRegistered
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'aD0C9DaCf1e515615b0581c8D7E295E296Ec26E6', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('ServiceRegistered(address indexed indexer,string url,string geohash)')
    ) pc
    INNER JOIN (
        SELECT 
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'ServiceRegistered(address indexed indexer,string url,string geohash)')['indexer'] AS indexer,
            MAX(l.block_num) AS max_block_number
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast

In [13]:
indexer_geohash_query = f"""
SELECT 
    pc.block_number,
    pc.ServiceRegistered['indexer'] AS id,
    pc.ServiceRegistered['geohash'] AS geo_hash
FROM (
    SELECT 
        l.block_num AS block_number,
        evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}') AS ServiceRegistered
    FROM 
        eth_firehose.logs l
    WHERE 
        l.address = arrow_cast(x'{ServiceRegistry_address}', 'FixedSizeBinary(20)')
        AND l.topic0 = evm_topic('{ServiceRegistered_sig}')
) pc
INNER JOIN (
    SELECT 
        evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}')['indexer'] AS indexer,
        MAX(l.block_num) AS max_block_number
    FROM 
        eth_firehose.logs l
    WHERE 
        l.address = arrow_cast(x'{ServiceRegistry_address}', 'FixedSizeBinary(20)')
        AND l.topic0 = evm_topic('{ServiceRegistered_sig}')
    GROUP BY 
        evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}')['indexer']
) latest
ON pc.ServiceRegistered['indexer'] = latest.indexer AND pc.block_number = latest.max_block_number
"""
allocations_2_query = f"""
SELECT 
    indexer,
    COUNT(allocationID) AS total_allocation_count
FROM (
    SELECT  
        pc.block_num AS block_number,
        pc.AllocationCreated['metadata'] AS metadata,
        pc.AllocationCreated['allocationID'] AS allocationID,
        pc.AllocationCreated['tokens'] AS tokens,
        pc.AllocationCreated['epoch'] AS epoch,
        pc.AllocationCreated['subgraphDeploymentID'] AS subgraphDeploymentID,
        pc.AllocationCreated['indexer'] AS indexer
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationCreated_sig}') AS AllocationCreated
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{AllocationCreated_sig}')
    ) pc
) a
GROUP BY indexer
"""
queryfees_query = f"""
SELECT 
    COALESCE(rc.indexer, rcl.indexer) AS indexer,
    COALESCE(rc.totalQueryFeeRebates, 0) AS query_fee_rebates,
    COALESCE(rcl.totalQueryFeesCollected, 0) AS query_fees_collected
FROM (
    SELECT
        indexer,
        SUM(queryFeeRebates) AS totalQueryFeeRebates
    FROM (
        SELECT 
            pc1.block_num AS block_number,
            pc1.RebateClaimed['indexer'] AS indexer,
            pc1.RebateClaimed['tokens'] AS queryFeeRebates
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateClaimed_sig}') AS RebateClaimed
            FROM  
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{RebateClaimed_sig}')
        ) pc1
    ) a
    GROUP BY indexer
) rc
FULL OUTER JOIN (
    SELECT
        indexer,
        SUM(queryFeeRebates) AS totalQueryFeeRebates,
        SUM(queryFeesCollected) AS totalQueryFeesCollected
    FROM (
        SELECT 
            pc2.block_num AS block_number,
            pc2.RebateCollected['indexer'] AS indexer,
            pc2.RebateCollected['queryFees'] AS queryFeesCollected,
            pc2.RebateCollected['queryRebates'] AS queryFeeRebates
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateCollected_sig}') AS RebateCollected
            FROM  
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{RebateCollected_sig}')
        ) pc2
    ) a
    GROUP BY indexer
) rcl ON rc.indexer = rcl.indexer
"""
stakedTokens_final_query = f"""
SELECT 
    indexer, 
    SUM(stakedTokens) AS staked_tokens 
FROM (
    SELECT 
        indexer,
        SUM(tokens) AS stakedTokens
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.StakeDeposited['indexer'] AS indexer,
            pc.StakeDeposited['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDeposited_sig}') AS StakeDeposited
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeDeposited_sig}')
        ) pc
    ) a 
    GROUP BY indexer
    UNION ALL
    SELECT 
        indexer,
        -SUM(transferredStakeTokens) AS stakedTokens
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS transferredStakeTokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{IndexerStakeTransferredToL2_sig}') AS IndexerStakeTransferredToL2
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{L1Staking_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{IndexerStakeTransferredToL2_sig}')
        ) pc
    ) a 
    GROUP BY indexer
    UNION ALL
    SELECT 
        indexer,
        -SUM(tokens) AS stakedTokens
    FROM (
        SELECT  
            pc.block_num AS block_number,
            pc.StakeSlashed['indexer'] AS indexer,
            pc.StakeSlashed['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeSlashed_sig}') AS StakeSlashed
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeSlashed_sig}')
        ) pc
    ) a 
    GROUP BY indexer
) a
GROUP BY indexer
"""
delegation_tokens_query = f"""
SELECT 
    indexer, 
    SUM(tokens) AS delegated_tokens
FROM (
    SELECT 
        indexer,
        SUM(tokens) AS tokens
    FROM (
        SELECT 
            pc.StakeDelegated['indexer'] AS indexer,
            pc.StakeDelegated['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDelegated_sig}') AS StakeDelegated
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeDelegated_sig}')
        ) pc
    ) a 
    GROUP BY indexer
    UNION ALL 
    SELECT 
        indexer, 
        -SUM(tokens) AS tokens
    FROM (
        SELECT  
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{IndexerStakeTransferredToL2_sig}') AS IndexerStakeTransferredToL2
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{L1Staking_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{IndexerStakeTransferredToL2_sig}')
        ) pc
    ) 
    GROUP BY indexer
    UNION ALL 
    SELECT 
        indexer, 
        -SUM(tokens) AS tokens
    FROM (
        SELECT  
            pc.StakeDelegatedLocked['indexer'] AS indexer,
            pc.StakeDelegatedLocked['tokens'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDelegatedLocked_sig}') AS StakeDelegatedLocked
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{StakeDelegatedLocked_sig}')
        ) pc
    ) 
    GROUP BY indexer
    UNION ALL 
    SELECT 
        indexer, 
        SUM(tokens) AS tokens
    FROM (
        SELECT 
            pc2.RebateCollected['indexer'] AS indexer,
            pc2.RebateCollected['delegationRewards'] AS tokens
        FROM (
            SELECT 
                l.block_num,
                evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateCollected_sig}') AS RebateCollected
            FROM 
                eth_firehose.logs l
            WHERE 
                l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
                AND l.topic0 = evm_topic('{RebateCollected_sig}')
        ) pc2
    ) 
    GROUP BY indexer
) a
GROUP BY indexer
"""
locked_tokens_final_query = f"""
WITH ranked_tokens AS (
    SELECT  
        pc.block_num AS block_number,
        pc.StakeLocked['indexer'] AS indexer,
        pc.StakeLocked['tokens'] AS tokens,
        RANK() OVER (PARTITION BY pc.StakeLocked['indexer'] ORDER BY pc.block_num DESC) AS rn
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeLocked_sig}') AS StakeLocked
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{StakeLocked_sig}')
    ) pc
),
locked_tokens_withdraw AS (
    SELECT 
        pc.StakeWithdrawn['indexer'] AS indexer, 
        -SUM(pc.StakeWithdrawn['tokens']) AS locked_tokens_withdraw
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeWithdrawn_sig}') AS StakeWithdrawn
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{StakeWithdrawn_sig}')
    ) pc
    LEFT JOIN ranked_tokens rt ON pc.StakeWithdrawn['indexer'] = rt.indexer  
    WHERE pc.block_num >= rt.block_number
    GROUP BY pc.StakeWithdrawn['indexer']
)
SELECT 
    rt.indexer,
    rt.tokens - COALESCE(lw.locked_tokens_withdraw, 0) AS locked_tokens
FROM ranked_tokens rt
LEFT JOIN locked_tokens_withdraw lw ON rt.indexer = lw.indexer
WHERE rt.rn = 1
"""

In [14]:
# Assuming you have a function `process_query` to execute the query and return the result as a DataFrame


indexer_geohash_df = process_query(client, indexer_geohash_query)
allocations_2_df = process_query(client, allocations_2_query)
queryfees_df = process_query(client, queryfees_query)
stakedTokens_final_df = process_query(client, stakedTokens_final_query)
delegation_tokens_df = process_query(client, delegation_tokens_query)
locked_tokens_final_df = process_query(client, locked_tokens_final_query)


WITH ranked_tokens AS (
    SELECT  
        pc.block_num AS block_number,
        pc.StakeLocked['indexer'] AS indexer,
        pc.StakeLocked['tokens'] AS tokens,
        RANK() OVER (PARTITION BY pc.StakeLocked['indexer'] ORDER BY pc.block_num DESC) AS rn
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'StakeLocked(address indexed indexer,uint256 tokens,uint256 until)') AS StakeLocked
        FROM 
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('StakeLocked(address indexed indexer,uint256 tokens,uint256 until)')
    ) pc
),
locked_tokens_withdraw AS (
    SELECT 
        pc.StakeWithdrawn['indexer'] AS indexer, 
        -SUM(pc.StakeWithdrawn['tokens']) AS locked_tokens_withdraw
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3

In [17]:
# Merge the dataframes, specifying suffixes to handle column name conflicts
final_df = indexer_geohash_df.merge(allocations_2_df, how='left', left_on='id', right_on='indexer', suffixes=('', '_alloc'))
final_df = final_df.merge(queryfees_df, how='left', left_on='id', right_on='indexer', suffixes=('', '_query'))
final_df = final_df.merge(stakedTokens_final_df, how='left', left_on='id', right_on='indexer', suffixes=('', '_staked'))
final_df = final_df.merge(delegation_tokens_df, how='left', left_on='id', right_on='indexer', suffixes=('', '_delegation'))
final_df = final_df.merge(locked_tokens_final_df, how='left', left_on='id', right_on='indexer', suffixes=('', '_locked'))
final_df.columns

KeyError: "['id_alloc'] not found in axis"

# Step 4: Upload to BigQuery

In [32]:
# Convert the defaultdict to a regular dictionary
indexer_dict = {key: dict(value) for key, value in final_df.items()}

# Convert the dictionary to a pandas DataFrame
indexer_df = pd.DataFrame.from_dict(indexer_dict, orient='index')
# Reset index to turn the index into a column
indexer_df.reset_index(inplace=True)

In [36]:
final_df.columns

Index(['id', 'geo_hash', 'total_allocation_count', 'query_fee_rebates',
       'query_fees_collected', 'staked_tokens', 'delegated_tokens',
       'locked_tokens'],
      dtype='object')

In [35]:
indexer_df.head()

Unnamed: 0,index,0,1,2,3,4,5,6,7,8,...,583,584,585,586,587,588,589,590,591,592
0,id,0x24a17f3ce0b06d8da3d058dec91e36bb867cbe65,0xe6de2325ef1aac1f058fae59d3c38a472f569846,0x7f848049a1ae0570a03979489adc1e035c7e557c,0xfa4abb8ef2b1b5a7544fc0d2574c8d2b7a9bb3f4,0x0135e17621dbf2db1f6cec76f60841bc0a2d6fe7,0x6bff785e296743a33387b790488a2c3cc290cfc3,0x748aefb5c1c30c4a4ea963a6c49e90c00bdca15b,0xff061dccea6548025306165adf8470fa5e978229,0x9ea50b41b46aa1b5ba334f14193d970a443ee840,...,0x76340a7c034195c61a04ca70c26fa40c53562925,0x7f3f7b7d70b510a885296f28568777cdade8c5e6,0x6b2f128917d0d0bb1d077072c685a2d76a541b68,0x22923cefe47eabe4cdeac25410d6f4c0e6627dd0,0xc8a4294b23353361dfaa1c0c5531a30ad49871bf,0x8f03e1497f97233ffe9d81ada6df3e56aede4947,0xfd4b9c44ee7afb7f98df3dda40aead6acd9f41f4,0x26dc996fc95cff64f49ba5780d630fb1845e7406,0xc3f6dd40c4b2025b48f7cb675111df68976cb4af,0x9a5bd484200d96c0bdba21cd090f180e516d3b4c
1,geo_hash,dqbvgx1b6,ejdsycw00,u0yjjd6j1,ud9wrtt5g,ud9wr7k1p,u0yjjd6j1,ud9z9kpsu,u0yjjd6j1,u28zk697v,...,u8vwg0bex,sp1cht25v,u2bz1keyj,u6xnurudr,ud9wr3t2s,ud9wr778j,ejdsycw00,ud9wr7k1p,u0yjjd6j1,u0yjjd6j1
2,total_allocation_count,9.0,1082.0,8.0,470.0,18.0,8.0,93.0,11.0,30.0,...,27.0,22.0,9.0,10.0,3.0,23.0,7.0,14.0,14.0,14.0
3,query_fee_rebates,107673137997693413148,,,138541058372622567427,,,,,,...,,,,37820439100268783487,,,28874808540693621193,,,
4,query_fees_collected,0,,,2130501995062789724852,,,,,,...,,,,0,,,0,,,


In [37]:

# Columns to convert to string
columns_to_convert = [
    'staked_tokens', 'total_allocation_count', 'query_fees_collected', 
    'delegated_tokens', 'locked_tokens', 'query_fee_rebates'
]

# Convert the specified columns to string
for column in columns_to_convert:
    if column in final_df.columns:
        final_df[column] = final_df[column].astype(str)

# Save the DataFrame to a Parquet file
final_df.to_parquet('final_df.parquet', index=False)

In [38]:
# Read the Parquet file into a DataFrame
read_df = pd.read_parquet('final_df.parquet')

# Display the DataFrame to verify the data
print(read_df)

                                             id   geo_hash  \
0    0x24a17f3ce0b06d8da3d058dec91e36bb867cbe65  dqbvgx1b6   
1    0xe6de2325ef1aac1f058fae59d3c38a472f569846  ejdsycw00   
2    0x7f848049a1ae0570a03979489adc1e035c7e557c  u0yjjd6j1   
3    0xfa4abb8ef2b1b5a7544fc0d2574c8d2b7a9bb3f4  ud9wrtt5g   
4    0x0135e17621dbf2db1f6cec76f60841bc0a2d6fe7  ud9wr7k1p   
..                                          ...        ...   
588  0x8f03e1497f97233ffe9d81ada6df3e56aede4947  ud9wr778j   
589  0xfd4b9c44ee7afb7f98df3dda40aead6acd9f41f4  ejdsycw00   
590  0x26dc996fc95cff64f49ba5780d630fb1845e7406  ud9wr7k1p   
591  0xc3f6dd40c4b2025b48f7cb675111df68976cb4af  u0yjjd6j1   
592  0x9a5bd484200d96c0bdba21cd090f180e516d3b4c  u0yjjd6j1   

    total_allocation_count      query_fee_rebates    query_fees_collected  \
0                      9.0  107673137997693413148                       0   
1                   1082.0                    nan                     nan   
2                      8

In [42]:
from google.cloud import storage, bigquery


def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Example usage
bucket_name = 'nozzle-data-science'
source_file_name = '/Users/vivianpeng/nozzle/project-nozzle/python/examples/the_graph/final_df.parquet'
destination_blob_name = 'path/in/bucket/indexer_v1.parquet'

upload_to_gcs(bucket_name, source_file_name, destination_blob_name)

def load_parquet_to_bigquery(dataset_id, table_id, gcs_uri):
    """Loads a Parquet file from GCS into BigQuery."""
    client = bigquery.Client()

    table_ref = client.dataset(dataset_id).table(table_id)
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
    )

    load_job = client.load_table_from_uri(
        gcs_uri,
        table_ref,
        job_config=job_config
    )

    print(f"Starting job {load_job.job_id}")

    load_job.result()  # Waits for the job to complete.

    print(f"Job finished. Loaded {load_job.output_rows} rows into {dataset_id}:{table_id}")





File /Users/vivianpeng/nozzle/project-nozzle/python/examples/the_graph/final_df.parquet uploaded to path/in/bucket/indexer_v1.parquet.


In [43]:
# Example usage
dataset_id = 'dbt_vpeng'
table_id = 'indexer'
gcs_uri = f'gs://{bucket_name}/{destination_blob_name}'

load_parquet_to_bigquery(dataset_id, table_id, gcs_uri)



Starting job 84c1c64a-7a49-4c71-87d9-245930605660
Job finished. Loaded 593 rows into dbt_vpeng:indexer


# Contracts and Events Reference

## Contract: IStakingStitched
#### Event: AllocationCollected
### Event: AllocationCreated
### Event: StakeDelegated
### Event: StakeDeposited
### Event: StakeLocked 
### Event: RebateClaimed 
### Event: StakeDelegatedLocked
### Event: RebateCollected 
### Event: StakeWithdrawn
### Event: StakeSlashed
### Event: AllocationClosed

##### Event: AllocationCollected

In [None]:
AllocationCollected_sig = IStakingStitchedAbi.events["AllocationCollected"].signature()

AllocationCollected_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.AllocationCollected['indexer'] AS indexer,
            pc.AllocationCollected['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.AllocationCollected['epoch'] AS epoch,
            pc.AllocationCollected['tokens'] AS tokens,
            pc.AllocationCollected['allocationID'] AS allocationID,
            pc.AllocationCollected['from'] AS from,
            pc.AllocationCollected['curationFees'] AS curationFees,
            pc.AllocationCollected['rebateFees'] AS rebateFees
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationCollected_sig}') AS AllocationCollected
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{AllocationCollected_sig}')
) pc
"""

AllocationCollected = process_query(client, AllocationCollected_query)


    SELECT  pc.block_num AS block_number,
            pc.AllocationCollected['indexer'] AS indexer,
            pc.AllocationCollected['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.AllocationCollected['epoch'] AS epoch,
            pc.AllocationCollected['tokens'] AS tokens,
            pc.AllocationCollected['allocationID'] AS allocationID,
            pc.AllocationCollected['from'] AS from,
            pc.AllocationCollected['curationFees'] AS curationFees,
            pc.AllocationCollected['rebateFees'] AS rebateFees
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'AllocationCollected(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,address from,uint256 curationFees,uint256 rebateFees)') AS AllocationCollected
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')

In [None]:

AllocationCreated_sig = IStakingStitchedAbi.events["AllocationCreated"].signature()

AllocationCreated_query = f"""
    select  pc.block_num as block_number,
            pc.AllocationCreated['metadata'] as metadata,
            pc.AllocationCreated['allocationID'] as allocationID,
            pc.AllocationCreated['tokens'] as tokens,
            pc.AllocationCreated['epoch'] as epoch,
            pc.AllocationCreated['subgraphDeploymentID'] as subgraphDeploymentID,
            pc.AllocationCreated['indexer'] as indexer
      from (select l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationCreated_sig}') as AllocationCreated
                from logs l
             where l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               and l.topic0 = evm_topic('{AllocationCreated_sig}')
) pc
"""

AllocationCreated = process_query(client, AllocationCreated_query)


    select  pc.block_num as block_number,
            pc.AllocationCreated['metadata'] as metadata,
            pc.AllocationCreated['allocationID'] as allocationID,
            pc.AllocationCreated['tokens'] as tokens,
            pc.AllocationCreated['epoch'] as epoch,
            pc.AllocationCreated['subgraphDeploymentID'] as subgraphDeploymentID,
            pc.AllocationCreated['indexer'] as indexer
      from (select l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'AllocationCreated(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,bytes32 metadata)') as AllocationCreated
                from logs l
             where l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               and l.topic0 = evm_topic('AllocationCreated(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocati

In [None]:

StakeDelegated_sig = IStakingStitchedAbi.events["StakeDelegated"].signature()

StakeDelegated_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.StakeDelegated['indexer'] AS indexer,
            pc.StakeDelegated['delegator'] AS delegator,
            pc.StakeDelegated['tokens'] AS tokens,
            pc.StakeDelegated['shares'] AS shares
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDelegated_sig}') AS StakeDelegated
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{StakeDelegated_sig}')
) pc
"""

StakeDelegated = process_query(client, StakeDelegated_query)


    SELECT  pc.block_num AS block_number,
            pc.StakeDelegated['indexer'] AS indexer,
            pc.StakeDelegated['delegator'] AS delegator,
            pc.StakeDelegated['tokens'] AS tokens,
            pc.StakeDelegated['shares'] AS shares
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'StakeDelegated(address indexed indexer,address indexed delegator,uint256 tokens,uint256 shares)') AS StakeDelegated
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('StakeDelegated(address indexed indexer,address indexed delegator,uint256 tokens,uint256 shares)')
) pc

time to establish stream:  0.1251 s
time for first batch  56.6689 s
The type of df is  <class 'pandas.core.frame.DataFrame'>
received batch of  154  rows in  1.9799 s
received batch of  127  rows in  0.6997 s
received batch of  208  rows in  0.5668

### Event: StakeDeposited

In [None]:

StakeDeposited_sig = IStakingStitchedAbi.events["StakeDeposited"].signature()

StakeDeposited_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.StakeDeposited['indexer'] AS indexer,
            pc.StakeDeposited['tokens'] AS tokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDeposited_sig}') AS StakeDeposited
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{StakeDeposited_sig}')
) pc
"""

StakeDeposited = process_query(client, StakeDeposited_query)

StakeLocked_sig = IStakingStitchedAbi.events["StakeLocked"].signature()

StakeLocked_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.StakeLocked['indexer'] AS indexer,
            pc.StakeLocked['tokens'] AS tokens,
            pc.StakeLocked['until'] AS until
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeLocked_sig}') AS StakeLocked
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{StakeLocked_sig}')
) pc
"""

StakeLocked = process_query(client, StakeLocked_query)


    SELECT  pc.block_num AS block_number,
            pc.StakeDeposited['indexer'] AS indexer,
            pc.StakeDeposited['tokens'] AS tokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'StakeDeposited(address indexed indexer,uint256 tokens)') AS StakeDeposited
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('StakeDeposited(address indexed indexer,uint256 tokens)')
) pc

time to establish stream:  0.1176 s
time for first batch  15.0298 s
The type of df is  <class 'pandas.core.frame.DataFrame'>
received batch of  197  rows in  10.3333 s
received batch of  41  rows in  7.4236 s
received batch of  388  rows in  15.2885 s
received batch of  922  rows in  7.1798 s
received batch of  545  rows in  5.8569 s
received batch of  2434  rows in  0.4268 s
received batch of  1541  rows in  0.0446 s
received batch o

In [None]:

RebateClaimed_sig = IStakingStitchedAbi.events["RebateClaimed"].signature()

RebateClaimed_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.RebateClaimed['indexer'] AS indexer,
            pc.RebateClaimed['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.RebateClaimed['allocationID'] AS allocationID,
            pc.RebateClaimed['epoch'] AS epoch,
            pc.RebateClaimed['forEpoch'] AS forEpoch,
            pc.RebateClaimed['tokens'] AS tokens,
            pc.RebateClaimed['unclaimedAllocationsCount'] AS unclaimedAllocationsCount,
            pc.RebateClaimed['delegationFees'] AS delegationFees
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateClaimed_sig}') AS RebateClaimed
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{RebateClaimed_sig}')
) pc
"""

RebateClaimed = process_query(client, RebateClaimed_query)


    SELECT  pc.block_num AS block_number,
            pc.RebateClaimed['indexer'] AS indexer,
            pc.RebateClaimed['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.RebateClaimed['allocationID'] AS allocationID,
            pc.RebateClaimed['epoch'] AS epoch,
            pc.RebateClaimed['forEpoch'] AS forEpoch,
            pc.RebateClaimed['tokens'] AS tokens,
            pc.RebateClaimed['unclaimedAllocationsCount'] AS unclaimedAllocationsCount,
            pc.RebateClaimed['delegationFees'] AS delegationFees
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'RebateClaimed(address indexed indexer,bytes32 indexed subgraphDeploymentID,address indexed allocationID,uint256 epoch,uint256 forEpoch,uint256 tokens,uint256 unclaimedAllocationsCount,uint256 delegationFees)') AS RebateClaimed
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20

In [None]:

StakeDelegatedLocked_sig = IStakingStitchedAbi.events["StakeDelegatedLocked"].signature()

StakeDelegatedLocked_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.StakeDelegatedLocked['indexer'] AS indexer,
            pc.StakeDelegatedLocked['delegator'] AS delegator,
            pc.StakeDelegatedLocked['tokens'] AS tokens,
            pc.StakeDelegatedLocked['shares'] AS shares,
            pc.StakeDelegatedLocked['until'] AS until
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDelegatedLocked_sig}') AS StakeDelegatedLocked
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{StakeDelegatedLocked_sig}')
) pc
"""

StakeDelegatedLocked = process_query(client, StakeDelegatedLocked_query)


    SELECT  pc.block_num AS block_number,
            pc.StakeDelegatedLocked['indexer'] AS indexer,
            pc.StakeDelegatedLocked['delegator'] AS delegator,
            pc.StakeDelegatedLocked['tokens'] AS tokens,
            pc.StakeDelegatedLocked['shares'] AS shares,
            pc.StakeDelegatedLocked['until'] AS until
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'StakeDelegatedLocked(address indexed indexer,address indexed delegator,uint256 tokens,uint256 shares,uint256 until)') AS StakeDelegatedLocked
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('StakeDelegatedLocked(address indexed indexer,address indexed delegator,uint256 tokens,uint256 shares,uint256 until)')
) pc

time to establish stream:  0.1436 s
time for first batch  72.2564 s
The type of df is  <class 'pandas.core.frame.DataFrame'

In [None]:

RebateCollected_sig = IStakingStitchedAbi.events["RebateCollected"].signature()

RebateCollected_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.RebateCollected['assetHolder'] AS assetHolder,
            pc.RebateCollected['indexer'] AS indexer,
            pc.RebateCollected['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.RebateCollected['allocationID'] AS allocationID,
            pc.RebateCollected['epoch'] AS epoch,
            pc.RebateCollected['tokens'] AS tokens,
            pc.RebateCollected['protocolTax'] AS protocolTax,
            pc.RebateCollected['curationFees'] AS curationFees,
            pc.RebateCollected['queryFees'] AS queryFees,
            pc.RebateCollected['queryRebates'] AS queryRebates,
            pc.RebateCollected['delegationRewards'] AS delegationRewards
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateCollected_sig}') AS RebateCollected
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{RebateCollected_sig}')
) pc
"""

RebateCollected = process_query(client, RebateCollected_query)

StakeWithdrawn_sig = IStakingStitchedAbi.events["StakeWithdrawn"].signature()

StakeWithdrawn_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.StakeWithdrawn['indexer'] AS indexer,
            pc.StakeWithdrawn['tokens'] AS tokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeWithdrawn_sig}') AS StakeWithdrawn
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{StakeWithdrawn_sig}')
) pc
"""

StakeWithdrawn = process_query(client, StakeWithdrawn_query)

StakeSlashed_sig = IStakingStitchedAbi.events["StakeSlashed"].signature()

StakeSlashed_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.StakeSlashed['indexer'] AS indexer,
            pc.StakeSlashed['tokens'] AS tokens,
            pc.StakeSlashed['reward'] AS reward,
            pc.StakeSlashed['beneficiary'] AS beneficiary
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeSlashed_sig}') AS StakeSlashed
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{StakeSlashed_sig}')
) pc
"""

StakeSlashed = process_query(client, StakeSlashed_query)


    SELECT  pc.block_num AS block_number,
            pc.RebateCollected['assetHolder'] AS assetHolder,
            pc.RebateCollected['indexer'] AS indexer,
            pc.RebateCollected['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.RebateCollected['allocationID'] AS allocationID,
            pc.RebateCollected['epoch'] AS epoch,
            pc.RebateCollected['tokens'] AS tokens,
            pc.RebateCollected['protocolTax'] AS protocolTax,
            pc.RebateCollected['curationFees'] AS curationFees,
            pc.RebateCollected['queryFees'] AS queryFees,
            pc.RebateCollected['queryRebates'] AS queryRebates,
            pc.RebateCollected['delegationRewards'] AS delegationRewards
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'RebateCollected(address assetHolder,address indexed indexer,bytes32 indexed subgraphDeploymentID,address indexed allocationID,uint256 epoch,uint256 tokens,uint256 protocolTa

KeyboardInterrupt: 

###  derived StakedTokens

In [None]:
query= f'''SELECT 
        pc1.block_num AS block_number,
        pc1.RebateClaimed['indexer'] AS indexer,
        arrow_cast(pc1.RebateClaimed['tokens'], "Float64") AS queryFeeRebates
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RebateClaimed_sig}') AS RebateClaimed
        FROM  
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('{RebateClaimed_sig}')
    ) pc1 '''
process_query(client, query)

SELECT 
        pc1.block_num AS block_number,
        pc1.RebateClaimed['indexer'] AS indexer,
        arrow_cast(pc1.RebateClaimed['tokens'], "Float64") AS queryFeeRebates
    FROM (
        SELECT 
            l.block_num,
            evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'RebateClaimed(address indexed indexer,bytes32 indexed subgraphDeploymentID,address indexed allocationID,uint256 epoch,uint256 forEpoch,uint256 tokens,uint256 unclaimedAllocationsCount,uint256 delegationFees)') AS RebateClaimed
        FROM  
            eth_firehose.logs l
        WHERE 
            l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
            AND l.topic0 = evm_topic('RebateClaimed(address indexed indexer,bytes32 indexed subgraphDeploymentID,address indexed allocationID,uint256 epoch,uint256 forEpoch,uint256 tokens,uint256 unclaimedAllocationsCount,uint256 delegationFees)')
    ) pc1 


FlightInternalError: Flight returned internal error, with message: planning error: Schema error: No field named "Float64". Valid fields are pc1.block_num, pc1.rebateclaimed.

In [None]:
StakedTokens_query = f'''
WITH StakeDeposited_query AS (
    SELECT  pc.block_num AS block_number,
            'StakeDeposited' AS event_type,
            pc.StakeDeposited['indexer'] AS indexer,
            pc.StakeDeposited['tokens']  AS tokens
    FROM (SELECT l.block_num,
                 evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeDeposited_sig}') AS StakeDeposited
            FROM logs l
           WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
             AND l.topic0 = evm_topic('{StakeDeposited_sig}')
    ) pc
),
StakeSlashed_query AS (
    SELECT  pc.block_num AS block_number,
            'StakeSlashed' AS event_type,
            pc.StakeSlashed['indexer'] AS indexer,
            pc.StakeSlashed['tokens'] AS tokens
    FROM (SELECT l.block_num,
                 evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{StakeSlashed_sig}') AS StakeSlashed
            FROM logs l
           WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
             AND l.topic0 = evm_topic('{StakeSlashed_sig}')
    ) pc
),
CombinedEvents AS (
    SELECT * FROM StakeDeposited_query
    UNION ALL
    SELECT * FROM StakeSlashed_query
)


SELECT
    indexer,
    SUM(CASE 
            WHEN event_type = 'StakeDeposited' THEN tokens
            WHEN event_type = 'StakeSlashed' THEN -tokens
            ELSE 0
        END) AS stakedTokens,
    SUM(CASE 
            WHEN event_type = 'StakeDeposited' THEN 0
            WHEN event_type = 'StakeSlashed' THEN tokens
            ELSE 0
        END) AS lockedTokens
FROM CombinedEvents
GROUP BY indexer
'''

# Now execute the query
StakedTokens = process_query(client, StakedTokens_query)




WITH StakeDeposited_query AS (
    SELECT  pc.block_num AS block_number,
            'StakeDeposited' AS event_type,
            pc.StakeDeposited['indexer'] AS indexer,
            pc.StakeDeposited['tokens']  AS tokens
    FROM (SELECT l.block_num,
                 evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'StakeDeposited(address indexed indexer,uint256 tokens)') AS StakeDeposited
            FROM logs l
           WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
             AND l.topic0 = evm_topic('StakeDeposited(address indexed indexer,uint256 tokens)')
    ) pc
),
StakeSlashed_query AS (
    SELECT  pc.block_num AS block_number,
            'StakeSlashed' AS event_type,
            pc.StakeSlashed['indexer'] AS indexer,
            pc.StakeSlashed['tokens'] AS tokens
    FROM (SELECT l.block_num,
                 evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'StakeSlashed(address indexed indexer,uint256 tokens,uint256 re

In [None]:
StakedTokens

Unnamed: 0,indexer,stakedtokens,lockedtokens
0,0x6e54595363b20a0296b1c3b430fcb4c822d00c20,115274176989092020627769,0
1,0xd6a51a6cd8abc896f560e2bc9e1bdfc5ef4fa45b,539747660876477544767000,0
2,0x7ab4cf25330ed7277ac7ab59380b68eea68abb0e,11295269677263445629143015,0
3,0x72acbecf3013d7b27421185d3bf369520bf552c8,2041037177222402852017456,0
4,0xf4e5f4e0f8ec4d6235bd2fb0443058584ebffbff,963069478618438221082134,0
...,...,...,...
16,0x337b9a57187d7c80a7ba8f773512403a0edcc14b,406337436769810277050000,0
17,0x9a8be56015d32995e82745e14cb9d5dfb0cfac9d,102650694417507908830000,0
18,0x29ea3b83bae44e774dcf4181dbe1d5687cf2775e,100623796043967401595225,0
19,0x62eecb89c407a731e06e2b2c25563e2a8f96dfaa,368943045827797647851964,0


In [None]:
AllocationClosed_sig = IStakingStitchedAbi.events["AllocationClosed"].signature()

AllocationClosed_query = f"""
     SELECT  pc.block_num AS block_number,
            pc.AllocationClosed['indexer'] AS indexer,
            pc.AllocationClosed['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.AllocationClosed['epoch'] AS epoch,
            pc.AllocationClosed['tokens'] AS tokens,
            pc.AllocationClosed['allocationID'] AS allocationID,
            pc.AllocationClosed['sender'] AS sender,
            pc.AllocationClosed['poi'] AS poi,
            pc.AllocationClosed['isPublic'] AS isPublic
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationClosed_sig}') AS AllocationClosed
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{AllocationClosed_sig}')
) pc
"""

AllocationClosed = process_query(client, AllocationClosed_query)



    SELECT  pc.block_num AS block_number,
            pc.AllocationClosed['indexer'] AS indexer,
            pc.AllocationClosed['subgraphDeploymentID'] AS subgraphDeploymentID,
            pc.AllocationClosed['epoch'] AS epoch,
            pc.AllocationClosed['tokens'] AS tokens,
            pc.AllocationClosed['allocationID'] AS allocationID,
            pc.AllocationClosed['sender'] AS sender,
            pc.AllocationClosed['poi'] AS poi,
            pc.AllocationClosed['isPublic'] AS isPublic
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'AllocationClosed(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,uint256 effectiveAllocation,address sender,bytes32 poi,bool isPublic)') AS AllocationClosed
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = e

In [None]:
AllocationClosed_sig = IStakingStitchedAbi.events["AllocationClosed"].signature()

AllocationClosed_query = f"""
     SELECT  *
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationClosed_sig}') AS AllocationClosed
              FROM logs l
             WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{AllocationClosed_sig}')
) pc
"""

AllocationClosed = process_query(client, AllocationClosed_query)


     SELECT  *
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'AllocationClosed(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,uint256 effectiveAllocation,address sender,bytes32 poi,bool isPublic)') AS AllocationClosed
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('AllocationClosed(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,uint256 effectiveAllocation,address sender,bytes32 poi,bool isPublic)')
) pc

time to establish stream:  0.1747 s
time for first batch  58.06 s
The type of df is  <class 'pandas.core.frame.DataFrame'>
received batch of  1214  rows in  11.6158 s
received batch of  1924  rows in  2.0014 s
received batch of  4264  rows in  2.4904 s
received ba

In [None]:
data_to_export = AllocationClosed['allocationclosed'][1]

data_str = str(data_to_export)

# Open a file in write mode
with open('exported_data.txt', 'w') as file:
    file.write(data_str)

#### 　Create a query to get indexer allocation


In [None]:
Allocation_query = f"""
WITH allocation_closed AS (
    SELECT pc.block_num AS closed_block_number,
           pc.AllocationClosed['indexer'] AS indexer,
           pc.AllocationClosed['epoch'] AS closed_epoch,
           pc.AllocationClosed['allocationID'] AS allocationID
    FROM (
        SELECT l.block_num,
               evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationClosed_sig}') AS AllocationClosed
        FROM logs l
        WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
          AND l.topic0 = evm_topic('{AllocationClosed_sig}')
    ) pc
),

allocation_created AS (
    SELECT pc.block_num AS created_block_number,
           pc.AllocationCreated['allocationID'] AS allocationID,
           pc.AllocationCreated['epoch'] AS created_epoch,
           pc.AllocationCreated['indexer'] AS indexer
    FROM (
        SELECT l.block_num,
               evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationCreated_sig}') AS AllocationCreated
        FROM logs l
        WHERE l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
          AND l.topic0 = evm_topic('{AllocationCreated_sig}')
    ) pc
)

SELECT a.indexer,
       a.created_epoch,
       b.closed_epoch,
       a.created_block_number,
       b.closed_block_number
FROM allocation_created a
LEFT JOIN allocation_closed b 
ON a.allocationID = b.allocationID
"""
Allocation_query = process_query(client, Allocation_query)



WITH allocation_closed AS (
    SELECT pc.block_num AS closed_block_number,
           pc.AllocationClosed['indexer'] AS indexer,
           pc.AllocationClosed['epoch'] AS closed_epoch,
           pc.AllocationClosed['allocationID'] AS allocationID
    FROM (
        SELECT l.block_num,
               evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'AllocationClosed(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,uint256 effectiveAllocation,address sender,bytes32 poi,bool isPublic)') AS AllocationClosed
        FROM logs l
        WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
          AND l.topic0 = evm_topic('AllocationClosed(address indexed indexer,bytes32 indexed subgraphDeploymentID,uint256 epoch,uint256 tokens,address indexed allocationID,uint256 effectiveAllocation,address sender,bytes32 poi,bool isPublic)')
    ) pc
),

allocation_created AS (
    SELECT 

KeyboardInterrupt: 

In [None]:
Allocation_query[Allocation_query['indexer'] == '0x1a6a74648ff8146f9b7df3a7e322506612e13e6a'].sort_values(by='created_epoch')


Unnamed: 0,indexer,created_epoch,closed_epoch,created_block_number,closed_block_number
109,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,6,15,11493284,11548448.0
129,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,15,42,11548461,11725925.0
141,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,42,56,11725926,11818961.0
118,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,43,57,11733653,11825648.0
120,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,56,58,11818962,11837612.0
...,...,...,...,...,...
2345,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,1044,,18537620,
2364,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,1044,,18537620,
2332,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,1044,,18537620,
2318,0x1a6a74648ff8146f9b7df3a7e322506612e13e6a,1044,,18537620,


## Contract L1Staking


#### event: IndexerStakeTransferredToL2


In [None]:

IndexerStakeTransferredToL2_sig = L1StakingAbi.events["IndexerStakeTransferredToL2"].signature()

IndexerStakeTransferredToL2_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['l2Indexer'] AS l2Indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS transferredStakeTokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{IndexerStakeTransferredToL2_sig}') AS IndexerStakeTransferredToL2
              FROM logs l
             WHERE l.address = arrow_cast(x'{L1Staking_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{IndexerStakeTransferredToL2_sig}')
) pc
"""

IndexerStakeTransferredToL2 = process_query(client, IndexerStakeTransferredToL2_query)
IndexerStakeTransferredToL2



    SELECT  pc.block_num AS block_number,
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['l2Indexer'] AS l2Indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS transferredStakeTokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'IndexerStakeTransferredToL2(address indexed indexer,address indexed l2Indexer,uint256 transferredStakeTokens)') AS IndexerStakeTransferredToL2
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('IndexerStakeTransferredToL2(address indexed indexer,address indexed l2Indexer,uint256 transferredStakeTokens)')
) pc

time to establish stream:  0.1282 s
time for first batch  10.6477 s
The type of df is  <class 'pandas.core.frame.DataFrame'>
received batch of  14  rows in  2.6596 s
received batch of  18

Unnamed: 0,block_number,indexer,l2indexer,transferredstaketokens
0,19733407,0x5a8904be09625965d9aec4bffd30d853438a053e,0x2f09092aacd80196fc984908c5a9a7ab3ee4f1ce,4000000000000000000000000
1,19847628,0x2d2771e17fe8daff29b6d903559ec1c8192945b0,0x2d2771e17fe8daff29b6d903559ec1c8192945b0,100501767136255316115456
0,19353835,0x5e98a3ae86332d5f493f88f6e0c363fda891d30f,0x5e98a3ae86332d5f493f88f6e0c363fda891d30f,100000000000000000000000
1,19353961,0x5e98a3ae86332d5f493f88f6e0c363fda891d30f,0x5e98a3ae86332d5f493f88f6e0c363fda891d30f,11532657889083659750897741
2,19381100,0x87eba079059b75504c734820d6cf828476754b83,0x4e5c87772c29381bcabc58c3f182b6633b5a274a,1620000000000000000000000
...,...,...,...,...
96,18233353,0xbb784d9b398271b7a64f975bebde869409691915,0x6f9bb7e454f5b3eb2310343f0e99269dc2bb8a1d,500000000000000000000000
97,18234237,0x62a0bd1d110ff4e5b793119e95fc07c9d1fc8c4a,0xf92f430dd8567b0d466358c79594ab58d919a6d4,850000000000000000000000
98,18234254,0x4d6a8776a164776c93618233a0003e8894e7e6c2,0xe9e284277648fcdb09b8efc1832c73c09b5ecf59,2000000000000000000000000
99,18235987,0x234e50b877644e2130c86a16f97dcc1e7a057242,0x3185992ac24824097047f2eb9af066bde07919b8,30000000000000000000000000


#### event: DelegationTransferredToL2


In [None]:
IndexerStakeTransferredToL2_sig = L1StakingAbi.events["IndexerStakeTransferredToL2"].signature()

IndexerStakeTransferredToL2_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['l2Indexer'] AS l2Indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS transferredStakeTokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{IndexerStakeTransferredToL2_sig}') AS IndexerStakeTransferredToL2
              FROM logs l
             WHERE l.address = arrow_cast(x'{L1Staking_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{IndexerStakeTransferredToL2_sig}')
) pc
"""

IndexerStakeTransferredToL2 = process_query(client, IndexerStakeTransferredToL2_query)


    SELECT  pc.block_num AS block_number,
            pc.IndexerStakeTransferredToL2['indexer'] AS indexer,
            pc.IndexerStakeTransferredToL2['l2Indexer'] AS l2Indexer,
            pc.IndexerStakeTransferredToL2['transferredStakeTokens'] AS transferredStakeTokens
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'IndexerStakeTransferredToL2(address indexed indexer,address indexed l2Indexer,uint256 transferredStakeTokens)') AS IndexerStakeTransferredToL2
              FROM logs l
             WHERE l.address = arrow_cast(x'F55041E37E12cD407ad00CE2910B8269B01263b9', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('IndexerStakeTransferredToL2(address indexed indexer,address indexed l2Indexer,uint256 transferredStakeTokens)')
) pc

time to establish stream:  0.1179 s
time for first batch  10.2149 s
The type of df is  <class 'pandas.core.frame.DataFrame'>
received batch of  14  rows in  2.5838 s
received batch of  18

In [None]:

RewardsAssigned_sig = RewardsManagerAbi.events["RewardsAssigned"].signature()


RewardsAssigned_query = f"""
    SELECT  pc.block_num AS block_number,
            pc.RewardsAssigned['indexer'] AS indexer,
            pc.RewardsAssigned['allocationID'] AS allocationID,
            pc.RewardsAssigned['epoch'] AS epoch,
            pc.RewardsAssigned['amount'] AS amount
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{RewardsAssigned_sig}') AS RewardsAssigned
              FROM logs l
             WHERE l.address = arrow_cast(x'{RewardsManager_address}', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('{RewardsAssigned_sig}')
) pc
"""

RewardsAssigned = process_query(client, RewardsAssigned_query)



    SELECT  pc.block_num AS block_number,
            pc.RewardsAssigned['indexer'] AS indexer,
            pc.RewardsAssigned['allocationID'] AS allocationID,
            pc.RewardsAssigned['epoch'] AS epoch,
            pc.RewardsAssigned['amount'] AS amount
      FROM (SELECT l.block_num,
                   evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'RewardsAssigned(address indexed indexer,address indexed allocationID,uint256 epoch,uint256 amount)') AS RewardsAssigned
              FROM logs l
             WHERE l.address = arrow_cast(x'9ac758ab77733b4150a901ebd659cbf8cb93ed66⁠', 'FixedSizeBinary(20)')
               AND l.topic0 = evm_topic('RewardsAssigned(address indexed indexer,address indexed allocationID,uint256 epoch,uint256 amount)')
) pc



FlightInternalError: Flight returned internal error, with message: planning error: Error during planning: Invalid HexStringLiteral '9ac758ab77733b4150a901ebd659cbf8cb93ed66⁠'

In [None]:

# ServiceRegistered_sig = ServiceRegistryAbi.events["ServiceRegistered"].signature()

# DIDAttributeChanged_sig = IEthereumDIDRegistryAbi.events["DIDAttributeChanged"].signature()
# SetDefaultName_sig = GNSAbi.events['SetDefaultName'].signature()
# SubgraphPublished_sig = GNSAbi.events['SubgraphPublished'].signature()
# # Transfer_sig = GNSAbi.events['Transfer'].signature()
# pc3 AS (
#     SELECT 
#         l.block_num AS created_at_block_number,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{SetDefaultName_sig}')['graphAccount'] AS user_wallet_id
#     FROM logs l
#     WHERE l.address = arrow_cast(x'{GNS_address}', 'FixedSizeBinary(20)')
#       AND l.topic0 = evm_topic('{SetDefaultName_sig}')
# ),

# Transfer2_sig = IGraphTokenAbi.events['Transfer'].signature()
# query1 = f"""
# WITH pc1 AS (
#     SELECT 
#         l.block_num AS created_at_block_number,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{ServiceRegistered_sig}')['indexer'] AS user_wallet_id
#     FROM logs l
#     WHERE l.address = arrow_cast(x'{ServiceRegistry_address}', 'FixedSizeBinary(20)')
#       AND l.topic0 = evm_topic('{ServiceRegistered_sig}')
# ),
# pc2 AS (
#     SELECT 
#         l.block_num AS created_at_block_number,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{DIDAttributeChanged_sig}')['identity'] AS user_wallet_id
#     FROM logs l
#     WHERE l.address = arrow_cast(x'{IEthereumDIDRegistry_address}', 'FixedSizeBinary(20)')
#       AND l.topic0 = evm_topic('{DIDAttributeChanged_sig}')
# ),
# pc3 AS (
#     SELECT 
#         l.block_num AS created_at_block_number,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{SetDefaultName_sig}')['graphAccount'] AS user_wallet_id
#     FROM logs l
#     WHERE l.address = arrow_cast(x'{GNS_address}', 'FixedSizeBinary(20)')
#       AND l.topic0 = evm_topic('{SetDefaultName_sig}')
# ),
# pc4 AS (
#     SELECT 
#         l.block_num AS created_at_block_number,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{Transfer2_sig}')['to'] AS receiver,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{Transfer2_sig}')['from'] AS sender,
#         evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{Transfer2_sig}')['value'] AS value
#     FROM logs l
#     WHERE l.address = arrow_cast(x'{IGraphToken_address}', 'FixedSizeBinary(20)')
#       AND l.topic0 = evm_topic('{Transfer2_sig}')
# ),
# pc5 AS (
#     SELECT receiver,
#            SUM(value) AS received_grt
#     FROM pc4
#     GROUP BY receiver
# ),
# pc6 AS (
#     SELECT sender,
#            -SUM(value) / 1e18 AS sent_grt
#     FROM pc4
#     GROUP BY sender
# ),
# pc7 AS (
#     SELECT DISTINCT user_wallet_id 
#     FROM (
#         SELECT DISTINCT user_wallet_id FROM pc1
#         UNION ALL
#         SELECT DISTINCT user_wallet_id FROM pc2
#         UNION ALL
#         SELECT DISTINCT user_wallet_id FROM pc3
#     ) AS combined_wallet_ids
# )

# SELECT pc7.user_wallet_id,
#        pc5.received_grt,
#        pc6.sent_grt
# FROM pc7
# LEFT JOIN pc5 ON pc7.user_wallet_id = pc5.receiver
# LEFT JOIN pc6 ON pc7.user_wallet_id = pc6.sender
# """



# query1 = process_query(client, query1)


# IStakingStitchedAbi = Abi(IStakingStitched_path)

# AllocationCreated_sig = IStakingStitchedAbi.events["AllocationCreated"].signature()

# AllocationCreated_query = f"""
#     select  pc.block_num as block_number,
#             pc.AllocationCreated['metadata'] as metadata,
#             pc.AllocationCreated['allocationID'] as allocationID,
#             pc.AllocationCreated['tokens'] as tokens,
#             pc.AllocationCreated['epoch'] as epoch,
#             pc.AllocationCreated['subgraphDeploymentID'] as subgraphDeploymentID,
#             pc.AllocationCreated['indexer'] as indexer,
#             pc.AllocationCreated['rebateFees'] as rebateFees
#       from (select l.block_num,
#                    evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{AllocationCreated_sig}') as AllocationCreated
#                 from logs l
#              where l.address = arrow_cast(x'{IStakingStitched_address}', 'FixedSizeBinary(20)')
#                and l.topic0 = evm_topic('{AllocationCreated_sig}')
# ) pc
# """

# AllocationCreated = process_query(client, AllocationCreated_query)


WITH pc1 AS (
    SELECT 
        l.block_num AS created_at_block_number,
        evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'ServiceRegistered(address indexed indexer,string url,string geohash)')['indexer'] AS user_wallet_id
    FROM logs l
    WHERE l.address = arrow_cast(x'aD0C9DaCf1e515615b0581c8D7E295E296Ec26E6', 'FixedSizeBinary(20)')
      AND l.topic0 = evm_topic('ServiceRegistered(address indexed indexer,string url,string geohash)')
),
pc2 AS (
    SELECT 
        l.block_num AS created_at_block_number,
        evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'DIDAttributeChanged(address indexed identity,bytes32 name,bytes value,uint256 validTo,uint256 previousChange)')['identity'] AS user_wallet_id
    FROM logs l
    WHERE l.address = arrow_cast(x'dCa7EF03e98e0DC2B855bE647C39ABe984fcF21B', 'FixedSizeBinary(20)')
      AND l.topic0 = evm_topic('DIDAttributeChanged(address indexed identity,bytes32 name,bytes value,uint256 validTo,uint256 previousChange)')
),
pc3 AS (
  

FlightCancelledError: Flight cancelled call, with message: Received RST_STREAM with error code 8. gRPC client debug context: UNKNOWN:Error received from peer ipv4:34.122.177.97:80 {created_time:"2024-06-17T09:42:40.083821-04:00", grpc_status:1, grpc_message:"Received RST_STREAM with error code 8"}. Client context: OK

# ToDo 

In [None]:
# ToDo: Waiting for Materialization Feature 
GNS_query = f"""
    SELECT *
    FROM eth_firehose.logs l
             WHERE l.address = arrow_cast(x'{GNS_address}', 'FixedSizeBinary(20)')
"""

GNS = process_query(client, GNS_query)
GNS.to_parquet('GNS.parquet', engine='pyarrow')

import pyarrow.parquet as pq

parquet_file_path = "GNS.parquet"
table_name = "eth_friehose.gns_table"

# Create the SQL query to load the Parquet file into the table
create_table_query = f"""
    CREATE TABLE {table_name} AS SELECT * FROM parquet_scan('{parquet_file_path}')
"""

# Execute the create table query to load the Parquet file into the table
process_query(client, create_table_query)

# Define the SQL query to read from the GNS contract table
SetDefaultName_sig = GNSAbi.events['SetDefaultName'].signature()
sql_query = f"""
    SELECT  
        block_num,
        evm_decode(l.topic1, l.topic2, l.topic3, l.data, '{SetDefaultName_sig}')['graphAccount'] AS user_wallet_id
    FROM {table_name} l
    WHERE l.topic0 = evm_topic('{SetDefaultName_sig}')
"""
result_df = process_query(client, sql_query)

print(result_df)