# Setup
For this to work the RPC URL needs to be from Helius as they are currently the only ones to support `getTransactionsForAddress`. It is possible to run this ingest & parse pipeline with free RPC (or RPC other than Helius), it would just need to use something like `getSignaturesForAddress` + `getTransaction` RPC calls.

In [1]:
import json
import requests
from requests.adapters import HTTPAdapter
from dotenv import load_dotenv
import os
from pathlib import Path
from datetime import datetime
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor

load_dotenv()
RPC_URL = os.getenv("RPC_URL")
PROGRAM_ID = "fUSioN9YKKSa3CUC2YUc4tPkHJ5Y6XW1yz8y6F7qWz9"
CALL_BATCH = 50

data_dir = Path("data/raw")
data_dir.mkdir(parents=True, exist_ok=True)

session = requests.Session()
adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50)
session.mount('https://', adapter)

# Ingest
Currently only ingesting Fusion transactions up to August 2nd 2025. To change modify the `blockTime` filter in `get_txs()` (UNIX timestamp)

In [2]:
def get_txs(address, pagination = None, last_sig = None):
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "getTransactionsForAddress",
        "params": [
            address,
            {
                'transactionDetails': 'full',
                'encoding': 'jsonParsed',
                'sortOrder': 'asc',
                'limit': 100,
                'filters': {
                    'status': 'succeeded',
                    'blockTime': {'lte': 1754107200} # Aug 2 2025
                }
            }
        ]
    }
    if pagination:
        payload['params'][1]['paginationToken'] = pagination
    if last_sig:
        payload['params'][1]['filters']['signature'] = {'gt': last_sig}

    response = session.post(RPC_URL, json=payload)
    data = response.json()
    return data

In [3]:
txs = []
pagination = None
call_count = 0
file_count = len(list((Path('data/raw').glob('*.json'))))
last_sig = None
if file_count > 0:
    last_sig = json.loads(Path(f"data/raw/{file_count}.json").read_text())[-1]['transaction']['signatures'][0]

pbar = tqdm(total=CALL_BATCH, mininterval=1)

while True:
    ts = get_txs(PROGRAM_ID, pagination, last_sig)
    last_sig = None
    call_count += 1
    pbar.update(1)
    if ts.get('result').get('data'):
        txs.extend(ts['result']['data'])
        pagination = ts['result']['paginationToken']
        if not pagination: break
    if not ts.get('result').get('data'):
        break

    if call_count % CALL_BATCH == 0:
        file_count += 1
        Path(f"data/raw/{file_count}.json").write_text(json.dumps(txs))
        pbar.set_description(f"Wrote {file_count} | Last block time: {datetime.fromtimestamp(txs[-1]['blockTime']).strftime('%Y-%m-%d, %H:%M:%S')}")
        txs = []
        pbar.close()
        pbar = tqdm(total=CALL_BATCH, mininterval=1)
    
    


# Dump any remaining txs
if txs:
    file_count += 1
    Path(f"data/raw/{file_count}.json").write_text(json.dumps(txs))
    pbar.set_description(f"Wrote {file_count} | Last block time: {datetime.fromtimestamp(txs[-1]['blockTime']).strftime('%Y-%m-%d, %H:%M:%S')}")
pbar.close()

Wrote 1 | Last block time: 2025-07-28, 02:27:42: 100%|██████████| 50/50 [00:22<00:00,  2.26it/s]
Wrote 2 | Last block time: 2025-07-29, 16:45:28: 100%|██████████| 50/50 [00:24<00:00,  2.02it/s]
Wrote 3 | Last block time: 2025-07-30, 12:38:02: 100%|██████████| 50/50 [00:25<00:00,  1.95it/s]
Wrote 4 | Last block time: 2025-07-31, 13:17:08: 100%|██████████| 50/50 [00:25<00:00,  1.98it/s]
Wrote 5 | Last block time: 2025-08-01, 01:48:38: 100%|██████████| 50/50 [00:24<00:00,  2.05it/s]
Wrote 6 | Last block time: 2025-08-01, 23:58:35:  96%|█████████▌| 48/50 [00:22<00:00,  2.13it/s]


# Parse Transactions
Just extracting IX names and other meta here

In [4]:
from worker import process_file

data_dir = Path('data/raw')
out_dir = Path('data/parsed')
out_dir.mkdir(exist_ok=True)

paths = [str(p) for p in data_dir.glob('*.json')]

all_metas = []
all_fusion_ixs = []

print("Starting processing...")

with ProcessPoolExecutor() as ex:
    results = list(tqdm(ex.map(process_file, paths), total=len(paths)))

print("Aggregating results...")

for metas, ixs in results:
    all_metas.extend(metas)
    all_fusion_ixs.extend(ixs)


print(f"Writing {len(all_metas):,} transactions metadata...")
Path(out_dir / 'tx_meta.json').write_text(json.dumps(all_metas))


print(f"Writing {len(all_fusion_ixs):,} fusion instructions...")
Path(out_dir / 'ixs_fusion.json').write_text(json.dumps(all_fusion_ixs))

print("Done.")

Starting processing...


100%|██████████| 6/6 [00:00<00:00,  7.95it/s]


Aggregating results...
Writing 29,660 transactions metadata...
Writing 33,543 fusion instructions...
Done.


# Decode Instructions
We are not parsing all possible instructions. You can expand the list of instructions to parse by adding entries to `ix_modules` (the key is instruction name as it appears in IDL and the value is the respective file from `fusionamm/instructions`)

**NOTE**: openLimitOrder instructions need to be handeled for different versions. It is possible that more instructions need special handling.

In [5]:
import importlib
import base58
from worker import INSTRUCTIONS
ixs = json.loads(Path('data/parsed/ixs_fusion.json').read_text())

ix_modules = {'swap': 'swap',
         'increase_limit_order': 'increaseLimitOrder',
         'open_limit_order': 'openLimitOrder',
         'decrease_limit_order': 'decreaseLimitOrder',
         'close_limit_order': 'closeLimitOrder',
         'collect_fees': 'collectFees',
         'increase_liquidity': 'increaseLiquidity',
         'update_fees': 'updateFees',
         'decrease_liquidity': 'decreaseLiquidity',
         'open_position': 'openPosition',
         'close_position': 'closePosition',
         'two_hop_swap': 'twoHopSwap',
         }

for ix in tqdm(ixs):
    ix_name = ix['ix_name']
    if ix_name not in ix_modules:
        continue

    module = importlib.import_module(f'fusionamm.instructions.{ix_modules[ix_name]}')

    if ix_name == 'open_limit_order':
        if len(ix['ix_accounts']) == 9:
            module = importlib.import_module(f'fusionamm.instructions.openLimitOrderOld')
    
    # Decode and parse args
    if hasattr(module, 'layout'):
        data_bytes = base58.b58decode(ix['ix_data'])
        args_bytes = data_bytes[8:]  # Skip discriminator
        parsed_args = dict(module.layout.parse(args_bytes))
        parsed_args.pop('_io')
        parsed_args.pop('remainingAccountsInfo') if 'remainingAccountsInfo' in parsed_args else None
        ix['ix_args'] = parsed_args
    else:
        ix['ix_args'] = {}
    
    # Map accounts to names
    account_names = INSTRUCTIONS[ix_name]['accounts']
    ix['ix_accounts_named'] = dict(zip(account_names, ix['ix_accounts']))

Path('data/parsed/ixs_fusion_parsed.json').write_text(json.dumps(ixs))


print(f"Sample tx: {ixs[0]['tx_signature']}")
print(f"https://solscan.io/tx/{ixs[0]['tx_signature']}")
print(f"Instruction names: {ixs[0]['ix_name']}")
print(f"Instruction args: {ixs[0]['ix_args']}")


100%|██████████| 33543/33543 [00:00<00:00, 52508.91it/s]


Sample tx: 5b6Y8XsBHeruy3N3oHmbzVz265EMXw1sKqifZ6Syg8BixMSr8X3T5CL5q3ddRtydsLRpo3QsKjUuextu6TvGvow2
https://solscan.io/tx/5b6Y8XsBHeruy3N3oHmbzVz265EMXw1sKqifZ6Syg8BixMSr8X3T5CL5q3ddRtydsLRpo3QsKjUuextu6TvGvow2
Instruction names: swap
Instruction args: {'amount': 8000000, 'otherAmountThreshold': 0, 'sqrtPriceLimit': 79226673515401279992447579055, 'amountSpecifiedIsInput': True, 'aToB': False}
