In [1]:
import time
import random
import pytz
import pandas as pd
import base58
import datetime
from solana.rpc.api import Client
from solders.pubkey import Pubkey
from collections import deque

from test_data import get_test_transactions

pd.options.display.max_rows = 500

In [2]:
class TransactionTypes:
    sell = "SELL"
    transfer = "TRANSFER"
    buy = "BUY"  # Fixed from 'reimburse'

class TransactionLine:
    def __init__(self, pubkey, tx, tx_detail):
        self.pubkey = pubkey
        self.tx = tx
        self.tx_detail = tx_detail
    
    @property
    def time(self):
        dt_object = datetime.datetime.fromtimestamp(self.tx.block_time, pytz.utc)
        pst_timezone = pytz.timezone('US/Pacific')
        pst_time = dt_object.astimezone(pst_timezone)
        return pst_time.strftime('%Y-%m-%d %I:%M:%S %p')
    
    @property
    def _pre_balances(self):
        return [bal / 1e9 for bal in self.tx_detail.value.transaction.meta.pre_balances]

    @property
    def _post_balances(self):
        return [bal / 1e9 for bal in self.tx_detail.value.transaction.meta.post_balances]

    @property
    def _account_keys(self):
        return [key.__str__() for key in self.tx_detail.value.transaction.transaction.message.account_keys]

    @property
    def _self_index(self):
        for i, key in enumerate(self._account_keys):
            if key == self.pubkey:
                return i
        return None  # Handle case where pubkey isn't found
    
    @property
    def balance(self):
        index = self._self_index
        return self._post_balances[index] if index is not None else 0

    @property
    def amount(self):
        index = self._self_index
        return self._post_balances[index] - self._pre_balances[index] if index is not None else 0
    
    @property
    def type(self):
        vote_program_id = "Vote111111111111111111111111111111111111111"
        if vote_program_id in self._account_keys:
            return TransactionTypes.vote

        if self.amount > 0:
            return TransactionTypes.buy
        elif self.amount < 0:
            return TransactionTypes.sell
        return TransactionTypes.transfer

    def to_dict(self):  # Changed to method
        return {
            'time': self.time,
            'amount': self.amount,
            'balance': self.balance,
            'type': self.type,
        }


In [3]:
client = Client("https://api.mainnet-beta.solana.com")
# client = Client("http://127.0.0.1:8899")
# client = Client("https://api.testnet.solana.com")
# client = Client("https://api.devnet.solana.com")
# client = Client("https://devnet.helius-rpc.com/?api-key=6d458895-7a09-456b-b6d3-91484984936f")

wallet_address = "NLMSHTjmSiRxGJPs3uaqtsFBC2dTGYwK41U18Nmw5kH"

def get_transactions(wallet_address, before):
    start = datetime.datetime.now()
    if before:
        print(f"Getting transactions from before {str(before)[:3]}...{str(before)[-3:]}: ", end='')
    decoded_address = base58.b58decode(wallet_address)
    ret = client.get_signatures_for_address(
        account=Pubkey(decoded_address), 
        limit=1000,
        before=before,
    )
    if before:
        print(datetime.datetime.now() - start)
    return ret

def batch_get_transactions(wallet_address, before=None):
    lines = []
    while len(lines) < 10000:
        tries = 0
        transactions = None
        while tries < 4:
            transactions = get_transactions(wallet_address, before=before).value
            if transactions is None or len(transactions) == 0:
                print('no transactions')
                tries += 1
            else:
                break
        
        transactions = get_test_transactions(datetime.date(2025, 1, 1), num_days=30, initial_price=100.0, volatility=5.0, num_transactions=10)
    
        if transactions is None or len(transactions) == 0:
            print('Failed to fetch transactions after 4 retries')
            break
    
        for transaction in transactions:
            tx_details = client.get_transaction(transaction.signature)
            lines.append(TransactionLine(wallet_address, transaction, tx_details).to_dict)
        
        if transactions:
            before = transactions[-1].signature
        return lines
    


In [4]:
def calculate_gains_fifo(transactions):
    """
    Calculate short-term and long-term capital gains using FIFO.
    Assumes transactions are sorted by date.
    """
    buys = deque()  # FIFO queue for buy transactions
    short_term_gains = 0.0
    long_term_gains = 0.0

    for tx in transactions:
        date = tx["date"]
        tx_type = tx["type"]
        price = tx["price"]
        quantity = tx["quantity"]

        if tx_type == "BUY":
            # Add buy transactions to FIFO queue
            buys.append((date, price, quantity))

        elif tx_type == "SELL":
            # Process sell transactions using FIFO
            remaining_to_sell = quantity

            while remaining_to_sell > 0 and buys:
                buy_date, buy_price, buy_quantity = buys.popleft()

                # Determine how much of this buy is sold
                qty_sold = min(remaining_to_sell, buy_quantity)
                remaining_to_sell -= qty_sold

                # Calculate gains
                gain = (price - buy_price) * qty_sold
                holding_period = (date - buy_date).days

                if holding_period >= 365:
                    long_term_gains += gain
                else:
                    short_term_gains += gain

                # If not all of the buy was sold, put remaining back in queue
                if buy_quantity > qty_sold:
                    buys.appendleft((buy_date, buy_price, buy_quantity - qty_sold))

    return {"short_term_gains": short_term_gains, "long_term_gains": long_term_gains}

def calculate_gains_lifo(transactions):
    """
    Calculate short-term and long-term capital gains using LIFO.
    """
    buys = []  # LIFO stack for buy transactions
    short_term_gains = 0.0
    long_term_gains = 0.0

    for tx in transactions:
        date = tx["date"]
        tx_type = tx["type"]
        price = tx["price"]
        quantity = tx["quantity"]

        if tx_type == "BUY":
            # Push buy transactions to LIFO stack
            buys.append((date, price, quantity))

        elif tx_type == "SELL":
            remaining_to_sell = quantity

            while remaining_to_sell > 0 and buys:
                buy_date, buy_price, buy_quantity = buys.pop()  # LIFO: last buy used first

                # Determine how much of this buy is sold
                qty_sold = min(remaining_to_sell, buy_quantity)
                remaining_to_sell -= qty_sold

                # Calculate gains
                gain = (price - buy_price) * qty_sold
                holding_period = (date - buy_date).days

                if holding_period >= 365:
                    long_term_gains += gain
                else:
                    short_term_gains += gain

                # If not all of the buy was sold, put remaining back
                if buy_quantity > qty_sold:
                    buys.append((buy_date, buy_price, buy_quantity - qty_sold))

    return {"short_term_gains": short_term_gains, "long_term_gains": long_term_gains}


In [7]:
# transactions = batch_get_transactions(wallet_address)
lines = get_test_transactions(datetime.date(2020, 1, 1), num_days=1825, initial_price=100.0, volatility=5.0, num_transactions=300)
pd.DataFrame(lines)

Unnamed: 0,date,type,price,quantity
0,2020-01-01,BUY,103.9,69
1,2020-01-04,SELL,108.89,31
2,2020-01-10,BUY,109.16,92
3,2020-01-13,SELL,109.54,16
4,2020-01-15,SELL,105.37,66
5,2020-01-16,SELL,108.48,17
6,2020-01-20,SELL,112.87,67
7,2020-01-24,SELL,115.95,41
8,2020-01-26,BUY,118.82,87
9,2020-01-27,BUY,122.25,86


In [10]:
pd.DataFrame([
    calculate_gains_lifo(lines),
    calculate_gains_fifo(lines),
])

Unnamed: 0,short_term_gains,long_term_gains
0,-11513.75,-1363.69
1,-45353.57,0.0
