In [4]:
import pandas as pd
import requests
import numpy as np
import json
from datetime import datetime
import matplotlib.pyplot as plt
from itertools import islice
from os.path import exists
import requests
import traceback

pd.set_option('display.max_colwidth', None)

In [135]:
class MyPortfolio:
    """
    Python ETL script for Friktion user portfolio data. 
    
    Currently supported Instruction Names: 
        - Deposit
        - CancelPendingDeposit
        - Withdrawal
        - CancelPendingWithdrawal
        - ClaimPendingWithdrawal
    """
    
    
    def __init__(self, 
                 date_start, 
                 date_end, 
                 ix_fname='friktion_ix.csv', 
                 deposit_fname='friktion_deposit.csv', 
                 deposit_cxl_fname='friktion_deposit_cxl.csv', 
                 withdraw_fname='friktion_withdraw.csv', 
                 withdraw_cxl_fname='friktion_withdraw_cancel.csv',
                 withdraw_claim_fname='friktion_claim_withdrawal.csv',
                 batch_size_days=14, 
                 batch_size_xfers=75
            ):
        """
        :ix_fname:              output csv for instructions
        :deposit_fname:         output csv for deposits
        :deposit_cxl_fname:     output csv for deposit cancels
        :withdraw_fname:        output csv for withdrawals
        :withdraw_cxl_fname:    output csv for withdrawal cancels
        :withdraw_claim_fname:    output csv for claiming pending withdrawal
        :batch_size_days:       batch size in days for query to keep query < 10k rows. Use bigger steps for larger data.
        :batch_size_transfers:  batch size transactions for query to keep query < 8kb 

        """
        self.volt_program = "VoLT1mJz1sbnxwq5Fv2SXjdVDgPXrb9tJyC8WpMDkSp"
        self.date_start = date_start
        self.date_end = date_end
        self.ix_fname = ix_fname
        self.deposit_fname = deposit_fname
        self.deposit_cxl_fname = deposit_cxl_fname
        self.withdraw_fname = withdraw_fname
        self.withdraw_cxl_fname = withdraw_cxl_fname
        self.withdraw_claim_fname = withdraw_claim_fname
        self.batch_size_days = batch_size_days
        self.batch_size_xfers = batch_size_xfers
        self.df_ix = []
        self.friktion_metadata = self.get_friktion_snapshot()


    ########################################################################################################
    ####################################          Queries             ######################################
    ########################################################################################################
    
    
    @property
    def ix_query(self):
        return """
            query MyQuery {
              solana {
                instructions(
                  time: {between: ["%s", "%s"]}
                  success: {is: true}
                  programId: {is: "VoLT1mJz1sbnxwq5Fv2SXjdVDgPXrb9tJyC8WpMDkSp"}
                ) {
                  block {
                    timestamp {
                      iso8601
                    }
                  }
                  log {
                    consumed
                    logs
                  }
                  transaction {
                    signature
                    success
                    feePayer
                  }
                  data {
                    base58
                  }
                }
              }
            }
        """
        
        
    @property
    def xfer_query(self):
        return """
            query MyQuery {
              solana(network: solana) {
                transfers(
                  signature: {in: [%s]}
                ) {
                  instruction {
                    action {
                      name
                      type
                    }
                    callPath
                  }
                  amount(success: {is: true})
                  transaction {
                    signer
                    signature
                  }
                  block {
                    timestamp {
                      iso8601
                    }
                  }
                  currency {
                    name
                    address
                    symbol
                    decimals
                  }
                  sender {
                    address
                    mintAccount
                  }
                  receiver {
                    address
                    mintAccount
                  }
                }
              }
            }
        """
        
        
    ########################################################################################################
    ################################          Helper Functions             #################################
    ########################################################################################################
    
    
    # TODO: Add retry logic to this in case of hangups. 
    @staticmethod
    def run_query(query):  # A simple function to use requests.post to make the API call.
        headers = {'X-API-KEY': 'BQYCaXaMZlqZrPCSQVsiJrKtxKRVcSe4'}
        request = requests.post('https://graphql.bitquery.io/', json={'query': query}, headers=headers)
        if request.status_code == 200:
            return request.json()
        else:
            print(request.reason)
            raise Exception('Query failed and return code is {}.{}'.format(request.status_code, query))
    
    
    @staticmethod
    def batch_iterable(iterable, n=1):
        """
        Takes in an iterable and returns an iterable of iterables with len==x
        """
        idxs = []
        l = len(iterable)
        for idx in range(0, l, n):
            idxs.append(iterable[idx:min(idx+n, l)])
        return idxs
        
        
    def format_txs_for_query(self, tx_signatures):
        """
        Batches a list of transactions into a list of string formatted transactions for querying. 
        Each of these strings contain (n=self.batch_size_xfers) unique transaction IDs.
        """
        batched_signatures = self.batch_iterable(tx_signatures, self.batch_size_xfers)
        
        def format_txs(x):
            return str(x)[1:-1].replace("\'", "\"").replace("\n", "")
        
        tx_strs = list(map(format_txs, batched_signatures))

        return tx_strs
    
    
    def get_existing_df(self, fname):
        # Create output file if doesn't exist
        if fname and exists(fname):
            return pd.read_csv(fname)
        else:
            return pd.DataFrame()
    
    @staticmethod
    def instruction_match(instructionData):
        if not instructionData or len(instructionData) < 8:
            return False
        
        instructionDescriptor = instructionData[:8]
        
        if instructionDescriptor == "PcB3tF1K":
            return "Withdraw"
        elif instructionDescriptor == "WuE7Hjns":
            return "Deposit"
        elif instructionDescriptor == "V8cW2nMq":
            return "CancelPendingDeposit"
        elif instructionDescriptor == "dxUbSCWk":
            return "CancelPendingWithdrawal"
        elif instructionDescriptor == "WcTWQsnk":
            return "ClaimPendingWithdrawal"
        else:
            return "Unclassified"
        
        
    def get_friktion_snapshot(self):
        """
        Load Friktion Metadata for Volt/Symbol Mapping to join to normal data
        
        """
        try:
            return pd.DataFrame(
                dict(
                    json.loads(
                        requests.get("https://friktion-labs.github.io/mainnet-tvl-snapshots/friktionSnapshot.json"
                        ).content)
                  )['allMainnetVolts']
            )[["globalId", "vaultAuthority", "shareTokenMint", "depositTokenSymbol", "depositTokenCoingeckoId"]]
        except Exception as e:
            print(datetime.now(), "Snapshot Data Invalid")
            traceback.print_exc()
    
            
    ########################################################################################################
    ################################          Data Retrieval             ###################################
    ########################################################################################################
    
    
    def get_ix(self, date_start, date_end):
        """
        Runs graphql instruction query for one date range. 
        """
        print(date_start, date_end)
        query = self.ix_query % (date_start, date_end)
        print(datetime.now(), "retrieving instructions for {} to {}".format(date_start, date_end))
        result = self.run_query(query)
        
        # convert GraphQL json to pandas dataframe
        df = pd.json_normalize(result['data']['solana']['instructions'])
        print(datetime.now(), df.shape[0], "instructions retrieved")
        
        df = df.rename(
            columns={
                "block.timestamp.iso8601": "timestamp", 
                "log.consumed": "computeUnits", 
                "log.logs": "programLogs", 
                "transaction.signature": "txSignature", 
                "transaction.success": "txSuccess", 
                "transaction.feePayer": "txSigner",
                "data.base58": "instructionData"
            }
        )
        return df
    
    
    def get_ix_batch(self):
        """
        Batch the instruction retrieval. Save the shit Drop duplicates. 

        """
                
        # Batch the days up nice and good so the graphql API calls don't bitch
        dates_batched = pd.date_range(self.date_start, self.date_end, freq='7D')
        dates_batched = [str(x.isoformat()) for x in dates_batched.append(pd.DatetimeIndex([self.date_end]))]
        date_ranges = list(zip(dates_batched, dates_batched[1:]))
        
        ixs = []
        
        for date_range in date_ranges:
            assert len(date_range)==2
            data = self.get_ix(date_range[0], date_range[1])
            ixs.append(data)
            
        df_ix = pd.concat(ixs, ignore_index=False)
        df_ix["instructionType"] = df_ix.instructionData.apply(lambda x: self.instruction_match(x))
        
        # Store df_ix before we write it to the DataFrame so we avoid getting xfers for every single ix
        self.df_ix = df_ix.drop_duplicates()
        print(datetime.now(), "final instruction data size: ", df_ix.shape[0])

        df_old = self.get_existing_df(self.ix_fname)
        df = pd.concat([df_old, df_ix], ignore_index=True)
        df.to_csv(self.ix_fname, index=False)
        print(datetime.now(), "wrote instruction data to csv...")

        
        
    def get_batched_xfers(self, instructionType, fname):
        """
        Get all transfers corresponding to a specific instructionType from Graphql query. 
        Batch these queries up b/c the string sizes are too large (curse GraphQL for not supporting joins)
        
        :instructionType: String corresponding to the instruction type of each query. 
        :fname: Name of where the old df is stored
        
        """
        # assert self.df_ix, "Error: instructions get_ix_batch() must be called before xfers are scraped"
            
        temp = self.df_ix.query("instructionType == '%s'" % (instructionType))
        
        if temp.empty:
            print(datetime.now(), "instructionType was not found in the data... breaking")
            return
        
        tx_signatures = list(temp["txSignature"].unique())
        tx_strs = self.format_txs_for_query(tx_signatures)
        print(datetime.now(), len(tx_strs), "signature batches required...")
        xfers = []
        
        for i, tx_str in enumerate(tx_strs):
            query = self.xfer_query % (tx_str)
            result = self.run_query(query)
            df = pd.json_normalize(result['data']['solana']['transfers'])
            xfers.append(df)
            print(datetime.now(), df.shape[0], "transfers scraped in batch %d" % i)

        df_xfer = pd.concat(xfers, ignore_index=False)
        df_xfer = df_xfer.rename(
            columns={
                "block.timestamp.iso8601": "timestamp", 
                "instruction.action.name": "instructionAction", 
                "instruction.callPath": "instructionOrder", 
                "transaction.signer": "userAddress", 
                "transaction.signature": "txSignature", 
                "currency.symbol": "currencySymbol", 
                "currency.name": "currencyName", 
                "receiver.address": "receiverAddress", 
                "sender.address": "senderAddress", 
                "currency.decimals": "currencyDecimals",
                "currency.address": "currencyAddress", 
                "sender.mintAccount": "senderTokenMint"
            }
          )
        
        print(datetime.now(), df_xfer.shape[0], "transfers retrieved")

        df_old = self.get_existing_df(fname)
        df_final = df_old.append(df_xfer, ignore_index=True).sort_values("instructionOrder")
        
        return df_final
    

    def parse_deposits(self):
        instructionType = 'Deposit'
        instructionAction = "transfer"
        tx_merge_key = "receiverAddress"
        meta_merge_key = "vaultAuthority"
        out_file = self.deposit_fname
        
        self.parse_base(instructionType, instructionAction, tx_merge_key, meta_merge_key, out_file)
        

    def parse_withdrawal(self):
        instructionType = 'Withdraw'
        instructionAction = "burn"
        tx_merge_key = "currencyAddress"
        meta_merge_key = "shareTokenMint"
        out_file = self.withdraw_fname
        
        self.parse_base(instructionType, instructionAction, tx_merge_key, meta_merge_key, out_file)
        
        
    def parse_deposit_cancel(self):
        instructionType = 'CancelPendingDeposit'
        instructionAction = "transfer"
        tx_merge_key = "senderAddress"
        meta_merge_key = "vaultAuthority"
        out_file = self.deposit_cxl_fname
        
        self.parse_base(instructionType, instructionAction, tx_merge_key, meta_merge_key, out_file)
                
        
    def parse_withdrawal_cancel(self):
        instructionType = 'CancelPendingWithdrawal'
        instructionAction = "mintTo"
        tx_merge_key = "currencyAddress"
        meta_merge_key = "shareTokenMint"
        out_file = self.withdraw_cxl_fname
        
        self.parse_base(instructionType, instructionAction, tx_merge_key, meta_merge_key, out_file)

        
    def parse_claim_withdrawal(self):
        instructionType = 'ClaimPendingWithdrawal'
        instructionAction = "transfer"
        tx_merge_key = "senderAddress"
        meta_merge_key = "vaultAuthority"
        out_file = self.withdraw_claim_fname

        self.parse_base(instructionType, instructionAction, tx_merge_key, meta_merge_key, out_file)

        
    def parse_base(self, instructionType, instructionAction, tx_merge_key, meta_merge_key, output_file):
        """
        generalized method for parsing transfer data. 
        
        1. Call get_batched_xfers()
        2. for each unique txSignature, find the xfer matching to the last instance of the instructionAction
        3. Join it to the friktion metadata based using tx_merge_key and meta_merge_key
        4. Drop extraneous rows
        5. Save the file to the output_file
        
        :instructionType: Type of instruction listed out in the instruction_match() method
        :instructionAction: type of transfer we are matching towards
        :tx_merge_key: what key in the xfer dataFrame do we want to merge on
        :meta_merge_key: what key in the metadata dataFrame we want to merge on. 
        :output_file: as name suggests
        """
        print(datetime.now(), "Parsing transfers for instructionType: %s" % instructionType)
        df = self.get_batched_xfers(instructionType, output_file)
        
        # Get rid of confusing wSOL entries
        df = df.query('currencyName != "Wrapped SOL"')

        # The deposit is always the last mintTo instruction.
        df = df.query('instructionAction=="{}"'.format(instructionAction)).groupby("txSignature").last().reset_index()
        
        df = pd.merge(df, self.friktion_metadata, how='left',
                      left_on=tx_merge_key, right_on=meta_merge_key, suffixes=('', '_drop'))
        df.drop([col for col in df.columns if 'drop' in col], axis=1, inplace=True)
        
        df.drop_duplicates().to_csv(output_file, index=False)
        print(datetime.now(), "{} data size: {}".format(instructionType, df.shape[0]))  
        
        
    def parse_all(self):
        self.get_ix_batch()
        self.parse_claim_withdrawal()
        self.parse_deposit_cancel()
        self.parse_withdrawal_cancel()
        self.parse_deposits()
        self.parse_withdrawal()


In [117]:
date_start = "2022-03-15T00:00:00Z"
date_end = "2022-03-18T00:00:00Z"

In [118]:
x = MyPortfolio(date_start, date_end)

In [119]:
x.parse_all()

2022-03-15T00:00:00+00:00 2022-03-18T00:00:00+00:00
2022-03-31 00:20:23.012834 retrieving instructions for 2022-03-15T00:00:00+00:00 to 2022-03-18T00:00:00+00:00
2022-03-31 00:20:29.453841 963 instructions retrieved
2022-03-31 00:20:29.458655 final instruction data size:  963
2022-03-31 00:20:29.523217 wrote instruction data to csv...
2022-03-31 00:20:29.523519 Parsing transfers for instructionType: ClaimPendingWithdrawal
2022-03-31 00:20:29.524992 1 signature batches required...
2022-03-31 00:20:31.975552 160 transfers scraped in batch 0
2022-03-31 00:20:31.976718 160 transfers retrieved
2022-03-31 00:20:31.994906 ClaimPendingWithdrawal data size: 56
2022-03-31 00:20:31.995139 Parsing transfers for instructionType: CancelPendingDeposit
2022-03-31 00:20:31.996858 1 signature batches required...
2022-03-31 00:20:33.566696 65 transfers scraped in batch 0
2022-03-31 00:20:33.571413 65 transfers retrieved
2022-03-31 00:20:33.599703 CancelPendingDeposit data size: 25
2022-03-31 00:20:33.599

# Data Fidelity Tests

In [136]:
def clear_files():
    import os
    os.remove('*.csv')

In [137]:
clear_files()

FileNotFoundError: [Errno 2] No such file or directory: '*.csv'

In [129]:
ix = pd.read_csv("friktion_ix.csv")

In [130]:
deposits = pd.read_csv("friktion_deposit.csv")

In [131]:
withdrawals = pd.read_csv("friktion_withdraw.csv")

In [132]:
claim = pd.read_csv("friktion_claim_withdrawal.csv")

In [133]:
withdrawals_cxl = pd.read_csv("friktion_withdraw_cancel.csv")

In [134]:
deposits_cxl = pd.read_csv("friktion_.csv")