In [1]:
import asyncio
import solana
import json
from solana.rpc.async_api import AsyncClient
from solana.exceptions import SolanaRpcException, SolanaExceptionBase
import nest_asyncio
#use logger in script instead of print

In [2]:
nest_asyncio.apply() # This is needed to run the async loop in Jupyter

In [3]:
mainnet_url = "https://api.mainnet-beta.solana.com"

In [4]:
def check_transaction_program(transaction) -> bool:
    """
    Verify that the first and last transaction programs are spl-token
    """
    try:
        if "program" in transaction["result"]["meta"]["innerInstructions"][0]["instructions"][0].keys():
            first_transaction_program = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][0]["program"]
            last_transaction_program = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][-1]["program"]
        elif "program" in transaction["result"]["meta"]["innerInstructions"][0]["instructions"][1].keys():
            first_transaction_program = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][1]["program"]
            last_transaction_program = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][-1]["program"]
    except:
        first_transaction_program = None
        last_transaction_program = None
    
    if first_transaction_program != "spl-token":
        return False
    
    if last_transaction_program != "spl-token":
        return False
    
    return True

def check_transaction_type(transaction) -> bool:
    """
    Verify that the first and last transaction types are transfer

    """
    try:
        if "program" in transaction["result"]["meta"]["innerInstructions"][0]["instructions"][0].keys():
            first_transaction_type = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][0]["parsed"]["type"]
            second_transaction_type = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][-1]["parsed"]["type"]
        elif "program" in transaction["result"]["meta"]["innerInstructions"][0]["instructions"][1].keys():
            first_transaction_type = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][1]["parsed"]["type"]
            second_transaction_type = transaction["result"]["meta"]["innerInstructions"][0]["instructions"][-1]["parsed"]["type"]
    except:
        first_transaction_type = None
        second_transaction_type = None

    if first_transaction_type != "transfer":
        return False
    
    if second_transaction_type != "transfer":
        return False
   
    return True

def check_token_address(transaction) -> bool:
    """
    Verify that the token address source is USDC and the token address destination is wrapped solana
    
    """
    usdc_token_address = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"
    wrapped_btc = "9n4nbM75f5Ui33ZbPYXn59EwSgE8CGsHtAeTH5YFeJ9E"
    wrapped_solana = "So11111111111111111111111111111111111111112"
    
    try:
        token_address_source = transaction["result"]["meta"]["preTokenBalances"][0]["mint"]
        token_address_destination = transaction["result"]["meta"]["preTokenBalances"][-1]["mint"]
    except:
        token_address_source = None
        token_address_destination = None
    
    if ((token_address_source != usdc_token_address) or (token_address_source != wrapped_btc)) and ((token_address_source != wrapped_solana)):
        return False
    
    if (token_address_destination != wrapped_solana) and ((token_address_destination != usdc_token_address) or (token_address_destination != wrapped_btc)):
        return False
    
    return True

def check_error_status(transaction) -> bool:
    """
    Verify that the error and status are None
    
    """

    try:
        error = transaction["result"]["meta"]["status"]["Ok"]
        status = transaction["result"]["meta"]["err"]
    except:
        error = None
        status = None
   
    
    if error is not None:
        return False
    
    if status is not None:
        return False
   
    return True

In [5]:
async def fetch_block(url):
    async with AsyncClient(url) as client:
        latest_blockhash = await client.get_latest_blockhash()
        slot = json.loads((latest_blockhash).to_json())["result"]["context"]["slot"]
    return slot # True
        


In [6]:
async def stream_data(url):
    while True:
        data = await fetch_block(url)
        if data is not None:
            # print(data)
            return data  # Process or handle the data as needed

    await asyncio.sleep(1)


In [7]:
def retrieve_transaction_details(transaction):
    if check_token_address(transaction):

        inner_instructions = transaction["result"]["meta"]["innerInstructions"]
        try:
            first_instruction = inner_instructions[0]["instructions"][0]["parsed"]["info"]
        except:
            first_instruction = inner_instructions[0]["instructions"][1]["parsed"]["info"]
    
        last_instruction = inner_instructions[0]["instructions"][-1]["parsed"]["info"]

        source_amount = first_instruction["amount"]
        destination_amount = last_instruction["amount"]
        source_account = first_instruction["source"]
        destination_account = last_instruction["destination"]
        block_time = transaction["result"]["blockTime"]
        transaction_fees = transaction["result"]["meta"]["fee"]
        signature = transaction["transaction"]["signatures"][0]

        send = {
            "source_amount": source_amount, "destination_amount": destination_amount, 
            "source_account": source_account, "destination_account": destination_account, 
            "block_time": block_time, "transaction_fees": transaction_fees, "signature": signature
            }

        return send

In [8]:
async def retrieve_block_transactions(url):
    while True:
        try:
            async with AsyncClient(url) as client:
                block_number = await stream_data(url)
                blk_transaction = await client.get_block(block_number, "jsonParsed", max_supported_transaction_version=0)
                return blk_transaction
        except:
            pass

In [9]:
async def main():
    while True:
        transactions = await retrieve_block_transactions(mainnet_url)
        transactions = json.loads(transactions.to_json())
        for transaction in transactions["result"]["transactions"]:
            data = retrieve_transaction_details(transaction)
            if data is not None:
                print(data)
            

In [12]:
asyncio.run(main())
# asyncio.run(retrieve_block_transactions(mainnet_url))

In [33]:
# from solana.rpc.api import Client
# solana_client = Client("https://docs-demo.solana-mainnet.quiknode.pro/")
# x = json.loads((solana_client.get_latest_blockhash()).to_json())

In [37]:
# x["result"]["context"]["slot"]

246978575

In [None]:
# from solana.rpc.api import Client
# from solders.signature import Signature
# solana_client = Client(mainnet_url)
# sig = Signature.from_string("5Fg1Ft6ujL6U1JkptxU8huZGaUxZKCaNncbp7Tf8SJ7hQ7Yej763JUGZzoMjeLzmF1JC9CY69hf4jmu3WBK3ZMX8")
# trans = solana_client.get_transaction(sig, "jsonParsed", max_supported_transaction_version=0)

In [None]:
# transaction = json.loads(trans.to_json())

In [None]:
# print(retrieve_transaction_details(transaction))

In [None]:
# x =  {'source_amount': '100000000', 'destination_amount': '1042089809', 'source_account': '6oHNHuPVeqKeCYXuizwRQdvS7yU5HMw3ug4hQX3vAAs', 'destination_account': 'BqjtchQuxAYHQyQhc2Yfj4oujFMSDLVd2woL2HX6tU9H', 'block_time': 1707183472, 'transaction_fees': 5000}