Investigating the <SUBGRAPH_NAME> subgraph deployment with reported PoI inconsistencies - 
- explorer: `https://thegraph.com/explorer/subgraph/<...>'
- IPFS hash: `<DEPLOYMENT_ID>`
- subgraph id: `<SUBGRAPH_ID>`
- displayName: `<SUBGRAPH_NAME>`
- repo: `https://github.com/<...>`

# Instructions

- Extract relevant PoI tables from indexer infrastructures 
    ``` 
    # connect to database and run following command to find out which schema the deployment is in
    select name from public.deployment_schemas where subgraph = '<DEPLOYMENT_ID>';

    # exit psql cli then dump the PoI table (you may need to change --host and --port)
    pg_dump --dbname="graph" --host=localhost --port=5432 --username=<YOUR_USERNAME>  --table='<SCHEMA_NAME_RESULT_FROM_ABOVE>."poi2$"' --file=<FILE_LOCATION_OF_CHOICE>.sql'
    ```

- Place PoI table dumps (.sql) in local directory (recommend using the data directory in this repo)
- Update configs values (section 2.1 below)
- Run all cells (Use topnav: `Cell`>`Run all`)
- Entity updates in the divergent blocks will be saved to a .csv in the output directory

# Imports and Setup

## Configs

In [4]:
# Investigation config
subgraph_name = '<SUBGRAPH_NAME>'
subgraph_id = '<DEPLOYMENT_ID>' # 'Qm...'
poi_directory = "../data/"
output_dir = '../outputs/'

# DB connection config
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = '5432'
POSTGRES_USERNAME = '<USERNAME>' 
POSTGRES_PASSWORD = '<PASSWORD>' 
POSTGRES_DBNAME = 'poi_analysis' # should already exist on the DB server (you may need to `create database ..`)

ETHEREUM_PROVIDER = '<ETHEREUM_CLIENT_ENDPOINT>'
IPFS_GATEWAY_MULTI_ADDRESS = '/dns/ipfs.infura.io/tcp/5001/https'

## Import required libs

In [5]:
# Setup auto reload, so any changes to underlying libs are applied
%load_ext autoreload
%autoreload 2
%matplotlib inline
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [6]:
# Import libs
import os
import sys 
import re 
import numpy as np
import pandas as pd
import json
from functools import reduce
from  matplotlib import pyplot
import plotly
import seaborn
import plotly.express as px
seaborn.set()
seaborn.set(font_scale=1.5)
from sqlalchemy import create_engine
import itertools

## Connect to DB

In [7]:
# Create connection to local compare DB
postgres_str = ('postgresql://{username}:{password}@{ipaddress}:{port}/{dbname}'
.format(username=POSTGRES_USERNAME,
password=POSTGRES_PASSWORD,
ipaddress=POSTGRES_HOST,
port=POSTGRES_PORT,
dbname=POSTGRES_DBNAME))

local_compare_db_cnx = create_engine(postgres_str)

# Transform and Load Reference PoI Tables into DB

In [8]:
#defining the replace method
def replace_table_name(file_path, subs, flags=0):
    with open(file_path, "r+") as file:
        #read the file contents
        file_contents = file.read()
        file_contents = re.sub('"sgd\d{1,3}"."poi2\$"', subs, file_contents, flags = re.M)
        file_contents = re.sub('sgd\d{1,3}."poi2\$"', subs, file_contents, flags = re.M)
        file_contents = re.sub('sgd\d{1,3}."poi2\$', subs, file_contents, flags = re.M)
        file.seek(0)
        file.truncate()
        file.write(file_contents)        

In [None]:
## Ensure schema exists
schemas_query = """SELECT s.nspname AS schema_table FROM pg_catalog.pg_namespace s where nspname = '{namespace_name}'""".format(namespace_name=subgraph_name)
schemas = pd.read_sql_query(schemas_query, con=local_compare_db_cnx)
if len(schemas) == 0:
    create_schema_query = """CREATE SCHEMA {namespace_name};""".format(namespace_name=subgraph_name)
    pd.read_sql_query(create_schema_query, con=local_compare_db_cnx)

In [None]:
# Update schema and table names for all files in directory
# insert into db
for filename in os.listdir(poi_directory):
    if (filename.split('.')[-1] != 'sql'):
        continue
    indexer = filename.split('_')[0]
    table_name = subgraph_name + '.' + indexer
    full_filename = os.path.join(poi_directory, filename)
    print('Loading', table_name)
    replace_table_name(full_filename, table_name)    
    tables_query = """ select tablename from pg_catalog.pg_tables where schemaname='{namespace_name}' and tablename='{name}';""".format(name=indexer, namespace_name=subgraph_name)
    matching_tables = pd.read_sql_query(tables_query, con=local_compare_db_cnx)
    if len(matching_tables) == 1:
        print(table_name, 'already in DB')
    if len(matching_tables) == 0:
        table_load_command = """psql -d disputed_indexers -f {sql_table_file}""".format(sql_table_file=full_filename)
        print('Inserting', table_name)
        print(table_load_command)
        res = os.system(table_load_command)

In [None]:
# Get list of all PoI tables in the compare db?
tables_query = """ select tablename from pg_catalog.pg_tables where schemaname='{namespace_name}';""".format(namespace_name=subgraph_name)
ref_tables = pd.read_sql_query(tables_query, con=local_compare_db_cnx)
ref_tables

# Compare reference datasets

In [None]:
reference_poi_dfs = []
for table in ref_tables.iterrows():
    name = table[1][0]   
    digest_name = name
    table_query = """select * from {namespace_name}.{table_name}""".format(namespace_name=subgraph_name, table_name=name)
    table_df = pd.read_sql_query(table_query, con=local_compare_db_cnx).rename(columns={'digest': digest_name})
    table_df['block_source'] = table_df['block_range'].apply(lambda x: x.lower)
    reference_poi_dfs.append(table_df)

In [None]:
poi_compare = reduce(lambda left,right: pd.merge(left,right,on='block_source', suffixes=('_left', '_right')), reference_poi_dfs)
poi_compare = poi_compare.loc[:,~poi_compare.columns.duplicated()]

In [None]:
# COMPARE ALL REFERENCE DIGESTS
poi_compare = poi_compare.drop(['vid', 'vid_left', 'vid_right', 'id', 'id_left', 'id_right', 'block_range_left', 'block_range_right'], axis=1, errors='ignore')
digest_columns = filter(lambda c: (c.__contains__('block') == False), poi_compare.columns)
poi_compare['block_source'] = poi_compare['block_source']

for pair in itertools.combinations(digest_columns, 2):
    column_name = pair[0] + '_' + pair[1]
    poi_compare[column_name] = poi_compare[pair[0]] == poi_compare[pair[1]]    
    poi_compare[column_name + '_numeric'] = poi_compare[column_name].apply(lambda x: 1 if x else -1)

numeric_df_columns = list(filter(lambda c: c.__contains__('numeric'), poi_compare.columns))
numeric_df_columns.append('block_source')
poi_compare_numeric = poi_compare[numeric_df_columns]
poi_compare_numeric.columns = poi_compare_numeric.columns.str.replace('_numeric', '')
# poi_compare    

In [None]:
# CREATE MELTED VERSION FOR CHARTING
melted_numeric = poi_compare_numeric.melt(id_vars='block_source')
# melted_numeric

# Analyze PoI Differences

## Get divergent block numbers


In [None]:
# Identify divergent block_source in each numeric compare column (return object with block numbers?)
compare_columns = list(filter(lambda c: (c.__contains__('block') == False), poi_compare_numeric.columns))
divergent_blocks = pd.DataFrame(columns=['comparison', 'divergent_block'])

for column in compare_columns:
    index = (poi_compare_numeric[column].values == -1).argmax()
    divergent_blocks = divergent_blocks.append(
        {
            'comparison': column, 
            'subgraph': subgraph_id,
            'divergent_block': poi_compare.iloc[index]['block_source'],
        }, 
        ignore_index=True
    )
divergent_blocks

In [None]:
unique_divergent_blocks = divergent_blocks.divergent_block.unique()
print('Unique divergent blocks')
print(unique_divergent_blocks)

## Chart

In [None]:
# CHART
melted_numeric = melted_numeric.sort_values(by=['block_source'], ascending=True)
fig = px.area(melted_numeric, x='block_source', y='value', color='variable', title='Where do the PoIs diverge?')
fig.show()

In [None]:
# ZOOMED CHART
lower_bound = min(unique_divergent_blocks) - 500
upper_bound = max(unique_divergent_blocks) + 500
melted_numeric_zoomed = melted_numeric[(melted_numeric['block_source'] > lower_bound) & (melted_numeric['block_source'] < upper_bound)]
fig = px.area(melted_numeric_zoomed, x='block_source', y='value', color='variable', title='Zoomed in on the divergence')
fig.update_layout(xaxis=dict(tickformat="."))
fig.show()

## Save analysis to csv

In [None]:
# Save comparison table to a local csv
compare_table_location = output_dir + subgraph_name + '_compare_pois.csv'
poi_compare_numeric.to_csv(compare_table_location, index=False)

In [None]:
# Save divergent blocks table to a local csv
divergence_table_location = output_dir + subgraph_name + '_poi_divergence_blocks.csv'
divergent_blocks.to_csv(divergence_table_location, index=False)

# Analyze divergent blocks

In [None]:
%%bash
pip install web3
pip install pyyaml
pip install ipfshttpclient==0.7.0a1

In [None]:
from web3 import Web3
import yaml
import ipfshttpclient

In [None]:
w3 = Web3(Web3.HTTPProvider(ETHEREUM_PROVIDER))
ipfs = ipfshttpclient.connect(IPFS_GATEWAY_MULTI_ADDRESS)

In [None]:
class DataSource:
    def __init__(self, address, abi_name, abi, events):
        self.abi_name = abi_name
        self.abi = abi
        self.address = address
        self.events = events

def getSource(data_source):
    address = data_source['source']['address']
    abi_name = data_source['source']['abi']    
    abi_location = list(filter(lambda abi: abi['name'] == abi_name, data_source['mapping']['abis']))[0]['file']['/']
    abi = ipfs.cat(abi_location)
    events = data_source['mapping']['eventHandlers']      
    return DataSource(address, abi_name, abi, events)

In [None]:
def get_matching_events(datasources, divergent_blocks):
    matching_events = pd.DataFrame(columns=['address', 'block', 'event', 'subgraph_events', 'handlers', 'log_params'])
    for source in datasources:
        address = w3.toChecksumAddress(source.address)
        print('source address', address)
        contract_abi = source.abi.decode("utf-8")
        contract = w3.eth.contract(address=address, abi=contract_abi)
        for block in divergent_blocks:
            print('  block:', block)
            logs_filter_params = {'fromBlock': block, 'toBlock': block, 'address': address}
            logs_filter = w3.eth.filter(logs_filter_params)
            logs = w3.eth.get_filter_logs(logs_filter.filter_id)
            for log in logs:
                for contract_event in contract.events:
                    subgraph_events = list(filter(lambda e: e['event'].split('(')[0] == contract_event.event_name, source.events))
                    handlers = [subgraph_event['handler'] for subgraph_event in subgraph_events]
                    if len(subgraph_events) > 0:
                        tx_receipt = w3.eth.get_transaction_receipt(log.transactionHash)
                        decoded_logs = contract_event().processReceipt(tx_receipt)
                        for decoded_log in decoded_logs:
                            print('    - event:', contract_event.event_name)
                            print('      log_params:')
                            for arg, value in decoded_log.args.items():
                                print('          {arg}: {value}'.format(arg=arg, value=value))                            
                            matching_events = matching_events.append(
                                {
                                    'address': address, 
                                    'block': block, 
                                    'event': contract_event.event_name, 
                                    'subgraph_events': subgraph_events,
                                    'handlers': handlers,
                                    'log_params': dict(decoded_log.args)
                                }, ignore_index=True)
    return matching_events

In [None]:
manifest = yaml.safe_load(ipfs.cat(subgraph_id))
data_sources = list(map(getSource, manifest["dataSources"]))

In [None]:
matching_events = get_matching_events(data_sources, unique_divergent_blocks)

In [None]:
print('Matching events')
print(len(matching_events))

In [None]:
# get list of unique events per contract & block
unique_event_signatures = matching_events.drop_duplicates(subset=['block', 'address', 'event']).drop(['log_params', 'subgraph_events'], axis=1)
print('Unique event signatures matched on divergent blocks')
unique_event_signatures

In [None]:
print('Matching events emitted in divergent blocks (log params included)')
matching_events

## Save analysis to csv

In [None]:
# Save matching events to a local csv
matching_events.to_csv(output_dir + subgraph_name + '_matching_events_on_divergent_blocks.csv')

In [None]:
# Save unique matching event signatures to a local csv
unique_event_signatures.to_csv(output_dir + subgraph_name + '_matching_event_signatures_on_divergent_blocks.csv')