In [None]:
%pip install solders solana base58 anchorpy

In [11]:
import requests
import pandas as pd
import numpy as np
import json
import time
import datetime
from anchorpy import Idl, Program, Provider
from base58 import b58decode
from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient

from pprint import pprint
import concurrent.futures

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [4]:
WHIRLPOOL_PROGRAM_ID = "whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc"
WHIRLPOOL_ADDRESS = "9tXiuRRw7kbejLhZXtxDxYs2REe43uH2e7k1kocgdM9B"
SWAP_METHODS = {"swap", "swap_v2", "two_hop_swap", "two_hop_swap_v2"}

HELIUS_API_KEY='8a9369ef-bac5-4468-a662-f3d45d032b1d'

In [5]:
def fetch_account_balance(token_vault_address: str):

  url = f"https://mainnet.helius-rpc.com/?api-key={HELIUS_API_KEY}"

  payload = {
      "jsonrpc": "2.0",
      "id": 1,
      "method": "getTokenAccountBalance",
      "params": [token_vault_address]
  }

  headers = {
      "accept": "application/json",
      "content-type": "application/json"
    }

  response = requests.post(url, json=payload, headers=headers)
  response_json = response.json()
  return response_json


def fetch_account_balance_parallel(token_vault_addresses: list[str]):
  balances = {}
  with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = {executor.submit(fetch_account_balance, addr): addr for addr in token_vault_addresses}

    for future in concurrent.futures.as_completed(futures):
      balances[futures[future]] = {
          'balance': future.result()['result']['value']['uiAmount'],
          'slot': future.result()['result']['context']['slot']
          }

  return balances


def fetch_signatures_for_address(address: str, limit: int=1000, before=None):
  url = f"https://mainnet.helius-rpc.com/?api-key={HELIUS_API_KEY}"

  params = {
      "limit": limit,
      }

  if before:
    params["before"] = before

  payload = {
      "jsonrpc": "2.0",
      "id": 1,
      "method": "getSignaturesForAddress",
      "params": [f"{address}", params]
      }

  headers = {
      "accept": "application/json",
      "content-type": "application/json"
    }

  response = requests.post(url, json=payload, headers=headers)
  response_json = response.json()
  return response_json


def fetch_signature_hist_from_address(address: str, lookback: int=14, before=None):
  current_time = time.time()
  cutoff_time = current_time - lookback * 24 * 60 * 60

  before = None if not before else before
  txs = {"slot": [], "block_time": [], "tx_signature": []}
  attempts = 0

  while True:
    if not before:
      print("Called API with no signature")
      response = fetch_signatures_for_address(address)
    else:
      print(f"Called API with signature {before}")
      response = fetch_signatures_for_address(address, before=before)


    result = response.get('result', None)

    if not result:
      print("No results returned. Retrying with delay...")
      time.sleep(3)
      attempts += 1
      if attempts > 0:
          continue

    attempts = 0

    batch = result

    for tx in batch:
      if tx["err"]:
        continue
      txs["slot"].append(tx["slot"])
      txs["block_time"].append(tx["blockTime"])
      txs["tx_signature"].append(tx["signature"])

    print('last observed block time:')
    oldest_time = min([tx["blockTime"] for tx in batch if tx.get("blockTime")])
    oldest_time_ui = datetime.datetime.fromtimestamp(oldest_time).strftime('%Y-%m-%d %H:%M:%S')
    print(f"{oldest_time_ui}")

    if oldest_time < cutoff_time:
      print("Desired block time reached!")
      break

    print('last observed signature:')
    before = result[-1]['signature']
    print(f"{before}")
    time.sleep(1)

  return pd.DataFrame(txs)


def fetch_raw_transaction(signature: str):
    url = f"https://mainnet.helius-rpc.com/?api-key={HELIUS_API_KEY}"
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "getTransaction",
        "params": [
            signature,
            {
                "encoding": "jsonParsed",
                "commitment": "finalized",
                "maxSupportedTransactionVersion": 0
            }
        ]
    }
    headers = {"accept": "application/json", "content-type": "application/json"}

    try:
        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()  # Raise an error for HTTP 4xx/5xx
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None
    except ValueError:
        print("Invalid JSON in response.")
        return None


def setup_program(idl_file: str, program_pubkey: str="whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc"):
  idl = Idl.from_json(open(idl_file).read())
  client = AsyncClient("https://api.mainnet-beta.solana.com")
  provider = Provider(client, None)
  program = Program(idl, Pubkey.from_string(program_pubkey), provider)
  return program


def decode_instruction(data_b58: str, program):
    data = b58decode(data_b58)
    try:
        ix = program.coder.instruction.parse(data)
        return ix.name
    except KeyError:
        return None


def get_pre_post_balances(tx_res, token_mint_a, token_mint_b, pool_address):
  tx_pre_post_balances = {
      'pre_balance_a': 0.0,
      'pre_balance_b': 0.0,
      'post_balance_a': 0.0,
      'post_balance_b': 0.0,
      'decimals_a': 0,
      'decimals_b': 0
  }


  meta = tx_res.get('result', {}).get('meta', {})
  pre_token_balances = meta.get('preTokenBalances', [])
  post_token_balances = meta.get('postTokenBalances', [])

  for bal in pre_token_balances:
    if bal.get('owner') == pool_address:
      mint = bal.get('mint')
      token = bal.get('uiTokenAmount', {})
      if mint == token_mint_a:
        tx_pre_post_balances['pre_balance_a'] = token.get('uiAmount', 0.0) or 0.0
        tx_pre_post_balances['decimals_a'] = token.get('decimals', 0) or 0
      elif mint == token_mint_b:
        tx_pre_post_balances['pre_balance_b'] = token.get('uiAmount', 0.0) or 0.0
        tx_pre_post_balances['decimals_b'] = token.get('decimals', 0) or 0


  for bal in post_token_balances:
    if bal['owner'] == pool_address:
      mint = bal.get('mint')
      token = bal.get('uiTokenAmount', {})
      if bal['mint'] == token_mint_a:
        tx_pre_post_balances['post_balance_a'] = token.get('uiAmount', 0.0) or 0.0
        tx_pre_post_balances['decimals_a'] = token.get('decimals', 0) or 0
      elif bal['mint'] == token_mint_b:
        tx_pre_post_balances['post_balance_b'] = token.get('uiAmount', 0.0) or 0.0
        tx_pre_post_balances['decimals_b'] = token.get('decimals', 0) or 0


  return tx_pre_post_balances


def get_swap_events_inner(res, pool_address, token_vault_a, token_vault_b, program):
  swap_events = []
  num_swaps = 0
  swap = {}
  vaultA_detected = False
  vaultB_detected = False

  if not res.get('result', None):
    return swap_events, num_swaps

  for inner in res['result']['meta']['innerInstructions']:
    ixs = inner["instructions"]

    for idx, ix in enumerate(ixs):
      program_id = ix.get('programId', None)
      data = ix.get('data', None)
      accounts = ix.get('accounts', None)

      # detect swap
      if program_id == WHIRLPOOL_PROGRAM_ID and accounts and data:
        ix_call = decode_instruction(data, program)

        if pool_address in accounts and ix_call is not None and ix_call in SWAP_METHODS:
          num_swaps += 1

          swap = {}
          vaultA_detected = False
          vaultB_detected = False

          # if swap detected loop through next inner instructions
          for next_ix in ixs[idx+1: ]:
            parsed = next_ix.get('parsed', None)

            if next_ix.get('programId') != "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" or not parsed:
              continue

            info = parsed.get('info', None)
            src, dst = info.get('source'), info.get('destination')

            if 'tokenAmount' in info:
              amount = int(info['tokenAmount']['amount'])
            elif 'amount' in info:
              amount = int(info['amount'])

            if not src or not dst:
              continue

            if token_vault_a in [src, dst]:
                vaultA_detected = True
                swap['token_amount_a'] = amount
            elif token_vault_b in [src, dst]:
                vaultB_detected = True
                swap['token_amount_b'] = amount

            if vaultA_detected and vaultB_detected:
                swap_events.append(swap)
                break

  return swap_events, num_swaps


def get_swap_events_outer(res, pool_address, token_vault_a, token_vault_b, program):
  swap_events = []
  num_swaps = 0
  swap = {}
  vaultA_detected = False
  vaultB_detected = False

  if not res.get('result', None):
    return swap_events, num_swaps

  for idx, ix in enumerate(res['result']['transaction']['message']['instructions']):
    program_id = ix.get('programId', None)
    data = ix.get('data', None)
    accounts = ix.get('accounts', None)

    if program_id == WHIRLPOOL_PROGRAM_ID and accounts and data:
      ix_call = decode_instruction(data, program)

      if pool_address in accounts and ix_call is not None and ix_call in SWAP_METHODS:
        num_swaps += 1
        for inner_ix in res['result']['meta']['innerInstructions']:
          if inner_ix['index'] != idx:
            continue

          swap = {}
          vaultA_detected = False
          vaultB_detected = False

          for transfer in inner_ix['instructions']:

            parsed = transfer.get('parsed', None)

            if transfer.get('programId') != "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" or not parsed:
              continue

            info = parsed.get('info', None)
            src, dst = info.get('source'), info.get('destination')

            if 'tokenAmount' in info:
              amount = int(info['tokenAmount']['amount'])
            elif 'amount' in info:
              amount = int(info['amount'])

            if not src or not dst:
              continue

            if token_vault_a in [src, dst]:
                vaultA_detected = True
                swap['token_amount_a'] = amount
            elif token_vault_b in [src, dst]:
                vaultB_detected = True
                swap['token_amount_b'] = amount

            if vaultA_detected and vaultB_detected:
                swap_events.append(swap)
                break

  return swap_events, num_swaps

In [7]:
def create_dataset_for_pool(pool_address: str, token_mint_a: str, token_mint_b: str, token_vault_a: str, token_vault_b: str, program_idl: str="/content/drive/MyDrive/colosseum_hackathon/parsing_code/whirlpool.json", lookback: int=14, before=None):
  txs_df = fetch_signature_hist_from_address(pool_address, lookback, before)

  # setup df
  txs_df['token_mint_a'] = token_mint_a
  txs_df['token_mint_b'] = token_mint_b
  txs_df['token_vault_a'] = token_vault_a
  txs_df['token_vault_b'] = token_vault_b

  txs_df['num_swaps'] = 0
  txs_df['token_amount_a'] = 0
  txs_df['token_amount_b'] = 0

  txs_df['pre_balance_a'] = 0.0
  txs_df['pre_balance_b'] = 0.0
  txs_df['post_balance_a'] = 0.0
  txs_df['post_balance_b'] = 0.0

  txs_df['decimals_a'] = 0
  txs_df['decimals_b'] = 0

  batch_num = 0

  # setup program from json to decode transactions
  program = setup_program(program_idl)

  print("Transaction collection complete :)")
  print(f"Total transactions: {txs_df.shape[0]}")

  for idx, row in txs_df.iterrows():
    res = fetch_raw_transaction(row['tx_signature'])

    if not res:
      print(f"❌ Failed to fetch transaction {idx}")
      continue

    # fetch swap amounts
    swap_events_inner, num_swaps_inner = get_swap_events_inner(res, pool_address, row['token_vault_a'], row['token_vault_b'], program)
    swap_events_outer, num_swaps_outer = get_swap_events_outer(res, pool_address, row['token_vault_a'], row['token_vault_b'], program)

    if swap_events_inner:
      token_amount_a = sum([int(swap['token_amount_a']) for swap in swap_events_inner])
      token_amount_b = sum([int(swap['token_amount_b']) for swap in swap_events_inner])

      txs_df.loc[idx, 'token_amount_a'] += token_amount_a
      txs_df.loc[idx, 'token_amount_b'] += token_amount_b
      txs_df.loc[idx, 'num_swaps'] += num_swaps_inner

    if swap_events_outer:
      token_amount_a = sum([int(swap['token_amount_a']) for swap in swap_events_outer])
      token_amount_b = sum([int(swap['token_amount_b']) for swap in swap_events_outer])

      txs_df.loc[idx, 'token_amount_a'] += token_amount_a
      txs_df.loc[idx, 'token_amount_b'] += token_amount_b
      txs_df.loc[idx, 'num_swaps'] += num_swaps_outer


    # get pre and post token balances
    pre_post_balances = get_pre_post_balances(res, row['token_mint_a'], row['token_mint_b'], pool_address)
    if pre_post_balances:
      txs_df.loc[idx, 'pre_balance_a'] = float(pre_post_balances['pre_balance_a'])
      txs_df.loc[idx, 'pre_balance_b'] = float(pre_post_balances['pre_balance_b'])
      txs_df.loc[idx, 'post_balance_a'] = float(pre_post_balances['post_balance_a'])
      txs_df.loc[idx, 'post_balance_b'] = float(pre_post_balances['post_balance_b'])
      txs_df.loc[idx, 'decimals_a'] = int(pre_post_balances['decimals_a'])
      txs_df.loc[idx, 'decimals_b'] = int(pre_post_balances['decimals_b'])

    print(f"Processed transaction {idx}")

    if (idx + 1) % 5000 == 0:
      path = f"/content/drive/MyDrive/colosseum_hackathon/datasets/pyusd_usdc_pool/pyusd_usdc_batch_{batch_num}.csv"
      txs_df.iloc[:idx+1].to_csv(path, index=False)
      print(f"✅ Saved progress to {path}")
      batch_num += 1

  print("Transaction processing complete :)")

  return txs_df

In [10]:
txs_df = create_dataset_for_pool(
    pool_address=WHIRLPOOL_ADDRESS,
    token_mint_a="2b1kV6DkPAnxd5ixfnxCpjxmKwqjjaYmCZfHsFu24GXo",
    token_mint_b="EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
    token_vault_a="EeF6oBy6AQiBJoRx5xiRNxa6cmpQE3ayVagj28QFZuyg",
    token_vault_b="MvB8poDgpDPbRgx8MXeb7EPEsawGuiBTqpkpM9exeLi",
    program_idl="/content/drive/MyDrive/colosseum_hackathon/parsing_code/whirlpool.json",
    lookback=60,
    before="4K7APXX7UvBCUvyVi7AJ4Cgqu1H7T2pb2Vf9cCAR1boBR493WHHZUgzicswLSTzRi2dA9aayWk2CVo8tbbaAbZCB"
    )

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Processed transaction 45057
Processed transaction 45058
Processed transaction 45059
Processed transaction 45060
Processed transaction 45061
Processed transaction 45062
Processed transaction 45063
Processed transaction 45064
Processed transaction 45065
Processed transaction 45066
Processed transaction 45067
Processed transaction 45068
Processed transaction 45069
Processed transaction 45070
Processed transaction 45071
Processed transaction 45072
Processed transaction 45073
Processed transaction 45074
Processed transaction 45075
Processed transaction 45076
Processed transaction 45077
Processed transaction 45078
Processed transaction 45079
Processed transaction 45080
Processed transaction 45081
Processed transaction 45082
Processed transaction 45083
Processed transaction 45084
Processed transaction 45085
Processed transaction 45086
Processed transaction 45087
Processed transaction 45088
Processed transaction 45089
Processed t

KeyboardInterrupt: 

In [None]:
txs_df['token_amount_a_ui'] = txs_df['token_amount_a'] / 10 ** txs_df['decimals_a']
txs_df['token_amount_b_ui'] = txs_df['token_amount_b'] / 10 ** txs_df['decimals_b']

In [18]:
txs_df.to_csv("/content/drive/MyDrive/colosseum_hackathon/datasets/sol_pengu_pool/sol_pengu_part3.csv", index=False)

In [19]:
txs_df.head()

Unnamed: 0,slot,block_time,tx_signature,token_mint_a,token_mint_b,token_vault_a,token_vault_b,num_swaps,token_amount_a,token_amount_b,pre_balance_a,pre_balance_b,post_balance_a,post_balance_b,decimals_a,decimals_b
0,372557788,1760144646,5PtvLMtwReCWfLcLp9V2NKUGvgYdRCN3qaUuUW6NMv3iMY...,So11111111111111111111111111111111111111112,2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv,J757hq9DXGPDYfCoeGpTcD9A71NFgNqBRMXHrdVGyRxK,SdFLxX6sWTkKWje3Xb4YNewbm5ieaj3tfEJYeLTyqyg,1,1150004045,8903724052,14939.956068,183655300.0,14938.806064,183664200.0,9,6
1,372557787,1760144646,4urJ6EyerDrf7KxWcLg9sGggd87BPhDZFs6GzAWiBceDdk...,So11111111111111111111111111111111111111112,2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv,J757hq9DXGPDYfCoeGpTcD9A71NFgNqBRMXHrdVGyRxK,SdFLxX6sWTkKWje3Xb4YNewbm5ieaj3tfEJYeLTyqyg,1,8714974177,67447466742,14948.671042,183587900.0,14939.956068,183655300.0,9,6
2,372557786,1760144645,2bvpAkVjNEZr37brggYbo7cyHUTHiag69kjiLF6hAeKhZo...,So11111111111111111111111111111111111111112,2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv,J757hq9DXGPDYfCoeGpTcD9A71NFgNqBRMXHrdVGyRxK,SdFLxX6sWTkKWje3Xb4YNewbm5ieaj3tfEJYeLTyqyg,1,17824102794,137797887740,14966.495145,183450100.0,14948.671042,183587900.0,9,6
3,372557785,1760144645,5pRxua1qbLCczQbt7Cq2srNTUW1ZcDztRoRyqWDREsceNu...,So11111111111111111111111111111111111111112,2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv,J757hq9DXGPDYfCoeGpTcD9A71NFgNqBRMXHrdVGyRxK,SdFLxX6sWTkKWje3Xb4YNewbm5ieaj3tfEJYeLTyqyg,1,3500612153,27039937359,14969.995757,183423100.0,14966.495145,183450100.0,9,6
4,372557784,1760144645,4YzyHDMxPztgVteM7Vg1F2bJMkRFqndVgCg6JN6Rjufzkx...,So11111111111111111111111111111111111111112,2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv,J757hq9DXGPDYfCoeGpTcD9A71NFgNqBRMXHrdVGyRxK,SdFLxX6sWTkKWje3Xb4YNewbm5ieaj3tfEJYeLTyqyg,1,390086808,3012693565,14970.385844,183420000.0,14969.995757,183423100.0,9,6
