In [1]:
import logging
import requests
import pandas as pd
import time
from time import sleep
from tqdm import tqdm  # progress bar package
from processors.update_tokens import main as update_tokens_main
import sys
from db.connection import DatabaseConnection
from db.queries import DatabaseQueries
from config import load_config
import random
import os
from datetime import datetime

# Configure logging globally
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# -------------------------------------------------------------------
def fetch_historical_price(token_mint, api_key):
    """
    Fetch the historical price data for a given token using the Birdeye API endpoint
    /defi/history_price. The API call requests the price history from:
        time_from = now - 188700 (seconds)
        time_to   = now

    Returns a DataFrame with historical snapshots sorted by unixTime or None if not found.
    """
    now = int(time.time())
    time_from = now - 188700  # 188700 seconds before now
    url = f"https://public-api.birdeye.so/defi/history_price?address={token_mint}&address_type=token&type=5m&time_from={time_from}&time_to={now}"
    logger.info(f"fetch_historical_price: Requesting history for token '{token_mint}' from {time_from} to {now}.")
    headers = {
        "accept": "application/json",
        "x-chain": "solana",
        "X-API-KEY": api_key
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        if data.get("success") is True:
            items = data.get("data", {}).get("items", [])
            if items:
                df = pd.DataFrame(items)
                df = df.sort_values("unixTime")
                logger.info(f"fetch_historical_price: Retrieved {len(df)} price snapshots for token '{token_mint}'.")
                return df
            else:
                logger.error(f"fetch_historical_price: No items found in API response for token '{token_mint}'. Response: {data}")
                return None
        else:
            logger.error(f"fetch_historical_price: API returned error for token '{token_mint}'. Response: {data}")
            return None
    except requests.RequestException as e:
        logger.error(f"fetch_historical_price: Request error for token '{token_mint}': {e}")
        return None

# -------------------------------------------------------------------
def fetch_token_details(chain_id, token_addresses, api_key):
    """
    Fetch token details from the Birdeye token overview endpoint.
    For each token address, it sends a GET request to:
      https://public-api.birdeye.so/defi/token_overview?address={token}
    The chain is specified in the request headers.

    Returns a DataFrame containing the details for all tokens.
    """
    logger.info(f"fetch_token_details: Starting to fetch details for {len(token_addresses)} tokens on chain '{chain_id}' using Birdeye API.")
    token_details_list = []
    # Filter out any None values
    token_addresses = [address for address in token_addresses if address is not None]
    for token in tqdm(token_addresses, desc='Fetching token details', unit='token'):
        url = f"https://public-api.birdeye.so/defi/token_overview?address={token}"
        headers = {
            "accept": "application/json",
            "x-chain": chain_id,
            "X-API-KEY": api_key
        }
        logger.info(f"fetch_token_details: Fetching details for token: {token}")
        try:
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            data = response.json()
            if data.get("success") is True:
                token_data = data.get("data", {})
                # Ensure the 'address' field exists for merging later.
                if "address" not in token_data:
                    token_data["address"] = token
                token_details_list.append(token_data)
                logger.info(f"fetch_token_details: Retrieved details for token: {token}")
            else:
                logger.error(f"fetch_token_details: API returned error for token {token}. Response: {data}")
            sleep(0.2)  # avoid rate limiting
        except requests.RequestException as e:
            logger.error(f"fetch_token_details: Request error for token {token}: {e}")
            sleep(0.2)
    token_details_df = pd.DataFrame(token_details_list)
    logger.info("fetch_token_details: Finished fetching all token details.")
    return token_details_df

# -------------------------------------------------------------------
def get_closest_price(token, block_time, token_history_data):
    """
    Given a token and a block_time, look up the historical price data for that token
    (as returned by fetch_historical_price) and return the price ('value') from the
    snapshot with the smallest time difference to block_time.
    """
    df = token_history_data.get(token)
    if df is None or df.empty:
        logger.warning(f"get_closest_price: No historical data available for token {token}.")
        return None
    # Calculate the absolute time difference between block_time and all snapshots
    time_diffs = (df["unixTime"] - block_time).abs()
    closest_idx = time_diffs.idxmin()
    closest_price = df.loc[closest_idx, "value"]
    return closest_price

# -------------------------------------------------------------------
def main():
    # Initialize configuration and database connections
    config = load_config()
    db_connection = DatabaseConnection(config.db)
    db_queries = DatabaseQueries(db_connection)

    try:
        logger.info("main: Starting main function execution.")

        # (1) Optionally update token information
        # logger.info("main: Calling update_tokens.main()...")
        # update_tokens_main()

        # (2) Fetch recent transactions (e.g., from the past 6 hours)
        logger.info("main: Fetching recent transactions...")
        transactions_df = db_queries.get_recent_transactions(n=3)
        logger.info(f"main: Fetched {len(transactions_df)} transactions.")

        # (3) Fetch token details once using the Birdeye API
        token_columns = ["open_input_mint", "open_output_mint", "close_input_mint", "close_output_mint"]
        unique_tokens = pd.unique(transactions_df[token_columns].values.ravel())
        logger.info(f"main: Found {len(unique_tokens)} unique token addresses for details.")
        chain_id = "solana"
        token_details_df = fetch_token_details(chain_id, unique_tokens, config.birdeye.api_key)

        # (4) Add new columns for tokens and their historical prices
        transactions_df["input_token"] = None
        transactions_df["output_token"] = None
        transactions_df["input_price"] = None
        transactions_df["output_price"] = None

        # (5) For each distinct token, fetch its historical price snapshots once
        logger.info("main: Fetching historical price data for each unique token...")
        token_history_data = {}
        for token in tqdm(unique_tokens, desc="Fetching token historical prices", unit="token"):
            df_history = fetch_historical_price(token, config.birdeye.api_key)
            if df_history is not None:
                token_history_data[token] = df_history
            else:
                logger.warning(f"main: No historical data found for token {token}.")
            sleep(0.2)  # To avoid rate limiting

        # (6) Process each transaction to look up the closest historical price for both input and output tokens
        logger.info("main: Processing transactions to calculate historical prices...")
        for index, row in tqdm(transactions_df.iterrows(), total=len(transactions_df), desc="Processing transactions"):
            if pd.notnull(row["open_input_mint"]):
                input_token = row["open_input_mint"]
                output_token = row["open_output_mint"]
                block_time = row["open_block_time"]
            else:
                input_token = row["close_input_mint"]
                output_token = row["close_output_mint"]
                block_time = row["close_block_time"]

            transactions_df.at[index, "input_token"] = input_token
            transactions_df.at[index, "output_token"] = output_token

            # Look up the closest price snapshot for each token based on block_time
            input_price = get_closest_price(input_token, block_time, token_history_data)
            output_price = get_closest_price(output_token, block_time, token_history_data)

            transactions_df.at[index, "input_price"] = input_price
            transactions_df.at[index, "output_price"] = output_price

        # (7) Calculate per-transaction dollar amounts
        logger.info("main: Calculating per-transaction dollar values...")
        transactions_df["dollars_sold"] = (
                transactions_df["in_amount"].astype(float) *
                transactions_df["input_price"].astype(float)
        )
        transactions_df["dollars_bought"] = (
                transactions_df["out_amount"].astype(float) *
                transactions_df["output_price"].astype(float)
        )

        # (8) Aggregate dollar amounts per token
        logger.info("main: Aggregating dollar values per token...")
        aggregated_sold = (
            transactions_df.groupby("input_token")["dollars_sold"]
            .sum()
            .reset_index()
            .rename(columns={"input_token": "token"})
        )
        aggregated_bought = (
            transactions_df.groupby("output_token")["dollars_bought"]
            .sum()
            .reset_index()
            .rename(columns={"output_token": "token"})
        )
        aggregated = pd.merge(aggregated_sold, aggregated_bought, on="token", how="outer")
        aggregated["dollars_sold"] = aggregated["dollars_sold"].fillna(0)
        aggregated["dollars_bought"] = aggregated["dollars_bought"].fillna(0)
        logger.info("main: Aggregated token values:")
        logger.info(aggregated)

        # (9) Merge aggregated data with token details
        # We merge on the token address field ("address") from the Birdeye API data.
        logger.info("main: Merging aggregated data with token details...")
        final_df = pd.merge(aggregated, token_details_df, left_on="token", right_on="address", how="left")

        # (10) Format dollars columns to use a comma as the decimal separator
        logger.info("main: Formatting dollar values with comma as decimal separator...")
        final_df["dollars_sold"] = final_df["dollars_sold"].apply(lambda x: f"{x:.2f}".replace('.',','))
        final_df["dollars_bought"] = final_df["dollars_bought"].apply(lambda x: f"{x:.2f}".replace('.',','))

        # (11) Remove timezone information for Excel compatibility
        for col in transactions_df.select_dtypes(include=["datetimetz"]).columns:
            transactions_df[col] = transactions_df[col].dt.tz_localize(None)
        for col in final_df.select_dtypes(include=["datetimetz"]).columns:
            final_df[col] = final_df[col].dt.tz_localize(None)

        # (12) Create a folder to store generated Excel files (if not exists) and write the final merged output to an Excel file.
        output_folder = "../data/"
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
            logger.info(f"main: Created output folder '{output_folder}'.")

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        excel_filename = f"final_aggregated_token_details_{timestamp}.xlsx"
        output_path = os.path.join(output_folder, excel_filename)

        logger.info(f"main: Writing final aggregated token details to Excel file at '{output_path}'...")
        final_df.to_excel(output_path, index=False)
        logger.info("main: Final Excel file written successfully.")
        print(final_df)

    except Exception as e:
        logger.error(f"main: An error occurred: {e}", exc_info=True)
    finally:
        db_connection.close()
        logger.info("main: Database connection closed.")

if __name__ == "__main__":
    main()


INFO:__main__:main: Starting main function execution.
INFO:__main__:main: Fetching recent transactions...
INFO:db.queries:Fetching recent transactions for the past 3 hours.
INFO:db.queries:Fetched 433 rows of transactions.
INFO:__main__:main: Fetched 433 transactions.
INFO:__main__:main: Found 94 unique token addresses for details.
INFO:__main__:fetch_token_details: Starting to fetch details for 94 tokens on chain 'solana' using Birdeye API.
Fetching token details:   0%|          | 0/93 [00:00<?, ?token/s]INFO:__main__:fetch_token_details: Fetching details for token: BonK1YhkXEGLZzwtcvRTip3gAL9nCeQD7ppZBLXhtTs
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): public-api.birdeye.so:443
DEBUG:urllib3.connectionpool:https://public-api.birdeye.so:443 "GET /defi/token_overview?address=BonK1YhkXEGLZzwtcvRTip3gAL9nCeQD7ppZBLXhtTs HTTP/1.1" 200 None
INFO:__main__:fetch_token_details: Retrieved details for token: BonK1YhkXEGLZzwtcvRTip3gAL9nCeQD7ppZBLXhtTs
Fetching token details: 

                                           token dollars_sold dollars_bought  \
0   27G8MtK7VtTcCHkpASjSDdkWWYfoqT6ggEuKidVJidD4     19854,66        4903,87   
1   2mhszy8YHwqs1fxruVHQQAUmNcfq31mtkmYYtNZNpump         0,00       19067,00   
2   2nCeHpECQvnMfzjU5fDMAKws1vBxMzxvWr6qqLpApump      1039,19           0,00   
3   2qWN2BMi8C8rkpRghLN6eaWzp76KKQBS9JGRVVdA1BC3         7,30           0,00   
4   2yd2Suus3YY4Sa7LHhn1PSHkjXj3XKrars4cCog2tGU8      9566,33        3042,16   
..                                           ...          ...            ...   
88   USDSwr9ApdHk5bvJKMjzff41FfuX8bSxdKcR81vTwcA     14999,18           0,00   
89   eL5fUxj2J4CiQsmW85k5FG9DvuQjjUoBHoQBi2Kpump      4164,43        4974,89   
90   jupSoLaHXQiZZTSfEWMTRRgpnyFm8f6sZdosWBjx93v       103,00         207,10   
91   mo7mapMrCsyci5w1td1wgrKPtNeCfjfkKi96DknWi5N         0,00           0,00   
92   vPtS4ywrbEuufwPkBXsCYkeTBfpzCd6hF52p8kJGt9b         0,00        2028,81   

                                       