In [56]:
import pytz
import os
import logging
import pandas as pd
from datetime import datetime
from typing import Tuple, List, Optional
from etherscan_functions import get_erc20_transfers, get_block_number_by_timestamp


logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger()

START_DATE = "2024-12-10 08:00"
END_DATE = "2024-12-11 08:00"
OUTPUT_FILE = 'C:/Users/YuweiCao/Documents/GitHub/Project/Project/etherscan/result'
api_key = "VQAIR728IM4Z8RZKPYBR4ESM5I3WBZK2C1" # my free API key, you can get one at https://etherscan.io/myapikey
base_url = "https://api.etherscan.io/v2/api" # We're using the v2 API 2024/12/12
ADDRESS = "0x5be9a4959308A0D0c7bC0870E319314d8D957dBB" # Address of the contract we want to get the source code of
chain_id = 1  # Ethereum Mainnet

In [46]:
def get_block_numbers(
    start_date: str,
    end_date: str,
    closest: str = "before",
    include_all: bool = False,
) -> List[int]:
    """
    Returns block numbers for a specified date range.

    :param start_date: Start date in 'YYYY-MM-DD' format.
    :param end_date: End date in 'YYYY-MM-DD' format.
    :param closest: 'before' or 'after', to find the closest block.
    :param include_all: If True, returns all block numbers in the range. 
                        If False, only returns the start and end block numbers.
    :return: A list of block numbers. 
             If include_all is True, includes all blocks in the range. 
             Otherwise, includes only the start and end block numbers.

    :raises ValueError: If input dates are invalid or API responses indicate failure.
    :raises ConnectionError: If the HTTP request fails.
    """
    # Convert input time (Singapore Time) to UTC
    singapore_tz = pytz.timezone("Asia/Singapore")
    utc_tz = pytz.utc

    try:
        # Parse input dates and localize to Singapore timezone
        start_dt_sgt = singapore_tz.localize(datetime.strptime(start_date, "%Y-%m-%d %H:%M"))
        end_dt_sgt = singapore_tz.localize(datetime.strptime(end_date, "%Y-%m-%d %H:%M"))
    except ValueError:
        raise ValueError("Dates must be in 'YYYY-MM-DD HH:MM' format.")

    # Convert Singapore time to UTC
    start_timestamp = int(start_dt_sgt.astimezone(utc_tz).timestamp())
    end_timestamp = int(end_dt_sgt.astimezone(utc_tz).timestamp())

    if start_timestamp >= end_timestamp:
        raise ValueError("Start date must be earlier than end date.")

    # Fetch block numbers using timestamps
    start_block = get_block_number_by_timestamp(timestamp=start_timestamp, closest="before")
    end_block = get_block_number_by_timestamp(timestamp=end_timestamp, closest="after")
    # combine the use of 'before' and 'after' to get a wider range of blocks


    # Return range or just start and end block numbers
    if include_all:
        return list(range(start_block, end_block + 1))
    return [start_block, end_block]

In [53]:
def fetch_erc20_transfers(address: str, start_block: int, end_block: int, offset: int = 100) -> pd.DataFrame:
    """
    Fetch ERC20 token transfer data and process it into a DataFrame.
    """
    all_transfers = []
    page = 1

    logger.info(f"Fetching ERC20 transfers from block {start_block} to {end_block}...")

    while True:
        try:
            # 调用 API 获取数据
            transfers = get_erc20_transfers(
                address=address, startblock=start_block, endblock=end_block, page=page, offset=offset
            )

            # 验证返回的数据格式
            if transfers is None:
                logger.warning("API returned None. Exiting...")
                break
            if isinstance(transfers, list) and len(transfers) == 0:
                logger.info("Empty list received. Exiting...")
                break
            if isinstance(transfers, pd.DataFrame) and transfers.empty:
                logger.info("Empty DataFrame received. Exiting...")
                break

            # 将数据转换为 DataFrame
            if isinstance(transfers, list):
                transfers_df = pd.DataFrame(transfers)
            elif isinstance(transfers, pd.DataFrame):
                transfers_df = transfers
            else:
                logger.warning(f"Unexpected data format: {type(transfers)}. Exiting...")
                break

            # 判断 DataFrame 是否为空
            if transfers_df.empty:
                logger.info("Empty DataFrame received. Exiting...")
                break

            # 将当前页数据添加到总列表
            all_transfers.extend(transfers_df.to_dict(orient='records'))
            logger.info(f"Page {page}: Retrieved {len(transfers_df)} transactions.")
            page += 1

        except Exception as e:
            logger.warning(f"API request failed on page {page}: {e}")
            break

    # 全局去重
    final_df = pd.DataFrame(all_transfers).drop_duplicates(subset=['hash'], keep='first')
    logger.info(f"Final dataset contains {len(final_df)} unique transactions.")
    return final_df


In [51]:
def process_and_save_transfers(transfers_df: pd.DataFrame, output_file: str) -> None:
    """
    Process ERC20 transfers DataFrame and save to a CSV file.
    """
    if not transfers_df.empty:
        transfers_df['dateTime'] = pd.to_datetime(
            pd.to_numeric(transfers_df['timeStamp'], errors='coerce'), unit='s', utc=True
        ).dt.strftime('%Y-%m-%d %H:%M:%S')

        # 重新排列列顺序
        cols = ['dateTime'] + [col for col in transfers_df.columns if col != 'dateTime']
        transfers_df = transfers_df[cols]
        output_file = os.path.join(output_file, 'erc20_transfers.csv')

        transfers_df.to_csv(output_file, index=False, encoding='utf-8')
        logger.info(f"Data successfully saved to {output_file}")
    else:
        logger.warning("No valid transfers to save. DataFrame is empty.")



In [57]:
def main():
    try:
        # 获取起始和结束 block number
        start_block, end_block = get_block_numbers(start_date=START_DATE, end_date=END_DATE, include_all=False)
        print(start_block, end_block)
        # 拉取 ERC20 转账数据
        transfers_df = fetch_erc20_transfers(address=ADDRESS, start_block=start_block, end_block=end_block)

        # 数据处理与保存
        process_and_save_transfers(transfers_df, OUTPUT_FILE)

    except Exception as e:
        logger.error(f"An error occurred during the execution: {e}")

if __name__ == "__main__":
    main()

INFO:root:Fetching ERC20 transfers from block 21368389 to 21375546...


21368389 21375546


INFO:root:Page 1: Retrieved 14 transactions.
INFO:root:Final dataset contains 14 unique transactions.
INFO:root:Data successfully saved to C:/Users/YuweiCao/Documents/GitHub/Project/Project/etherscan/result\erc20_transfers.csv
