## Description

Must get 1 minute candles from Binance USDT perps for the symbols in filtered_tokens.csv.
Must take in a parameter which specifies how many symbols to get data from. If this parameter is None then get all of them.
Populate the 1 minute candles according to the Candle dataclass in models.py.
Must respect rate limits. Should be complex and look at the rate limit data in the responses and adjust accordingly 
Must allow for a date range input. Not all the tokens would have been listed during the entire period so it needs to be able to figure out the data range within the date range for each symbol.
Output to data folder.

**Update:** Now supports smart appending - checks existing data and only fetches missing date ranges to avoid re-downloading existing data.

In [1]:
# Import required libraries
import pandas as pd
import numpy as np
import requests
import time
import asyncio
import aiohttp
from datetime import datetime, timedelta
from pathlib import Path
import sys
import os
from concurrent.futures import ThreadPoolExecutor
import threading

# Add the backtester module to the path
sys.path.append('/home/zac/back-testing')
from backtester.models import Candle

# Configuration
BINANCE_BASE_URL = "https://fapi.binance.com"
DATA_DIR = Path("/home/zac/back-testing/data")
TOKENS_FILE = DATA_DIR / "filtered_tokens.csv"

print("Libraries imported successfully!")
print(f"Data directory: {DATA_DIR}")
print(f"Tokens file: {TOKENS_FILE}")

Libraries imported successfully!
Data directory: /home/zac/back-testing/data
Tokens file: /home/zac/back-testing/data/filtered_tokens.csv


In [2]:
class BinanceRateLimiter:
    """Handle Binance API rate limiting with automatic adjustment and concurrent support"""
    
    def __init__(self, max_concurrent_requests=10):
        self.weight_limit = 2400  # Default weight limit per minute
        self.current_weight = 0
        self.window_start = time.time()
        self.min_delay = 0.05  # Reduced minimum delay (50ms)
        self.last_request_time = 0
        self.max_concurrent = max_concurrent_requests
        self.semaphore = threading.Semaphore(max_concurrent_requests)
        self.lock = threading.Lock()
        
    def check_rate_limits(self, response_headers):
        """Update rate limit info from response headers"""
        with self.lock:
            if 'x-mbx-used-weight-1m' in response_headers:
                self.current_weight = int(response_headers['x-mbx-used-weight-1m'])
                
            # If we're approaching the limit, implement backoff
            if self.current_weight > self.weight_limit * 0.7:  # 70% of limit
                print(f"Rate limit warning: {self.current_weight}/{self.weight_limit}")
            
    def wait_if_needed(self):
        """Wait if necessary to respect rate limits"""
        with self.lock:
            current_time = time.time()
            
            # Reset weight counter every minute
            if current_time - self.window_start >= 60:
                self.current_weight = 0
                self.window_start = current_time
                
            # Ensure minimum delay between requests
            time_since_last = current_time - self.last_request_time
            if time_since_last < self.min_delay:
                time.sleep(self.min_delay - time_since_last)
                
            # If weight is high, wait longer
            if self.current_weight > self.weight_limit * 0.8:
                wait_time = 60 - (current_time - self.window_start)
                if wait_time > 0:
                    print(f"Rate limit reached. Waiting {wait_time:.1f} seconds...")
                    time.sleep(wait_time)
                    self.current_weight = 0
                    self.window_start = time.time()
                    
            self.last_request_time = time.time()
    
    def acquire(self):
        """Acquire semaphore for concurrent request limiting"""
        self.semaphore.acquire()
        
    def release(self):
        """Release semaphore"""
        self.semaphore.release()

rate_limiter = BinanceRateLimiter(max_concurrent_requests=4)  # Conservative concurrent limit
print("Rate limiter initialized with concurrent support")

Rate limiter initialized with concurrent support


In [3]:
def get_symbol_info(symbol):
    """Get symbol information to determine listing date and availability"""
    rate_limiter.wait_if_needed()
    
    try:
        url = f"{BINANCE_BASE_URL}/fapi/v1/exchangeInfo"
        response = requests.get(url)
        response.raise_for_status()
        
        rate_limiter.check_rate_limits(response.headers)
        
        data = response.json()
        
        # Find the specific symbol
        for symbol_info in data['symbols']:
            if symbol_info['symbol'] == symbol:
                return {
                    'symbol': symbol,
                    'status': symbol_info['status'],
                    'onboardDate': symbol_info.get('onboardDate', None),
                    'contractType': symbol_info.get('contractType', 'PERPETUAL')
                }
        
        return None
        
    except requests.RequestException as e:
        print(f"Error getting symbol info for {symbol}: {e}")
        return None

def get_earliest_valid_timestamp(symbol, start_date):
    """Get the earliest valid timestamp for a symbol, considering listing date"""
    symbol_info = get_symbol_info(symbol)
    
    if not symbol_info or symbol_info['status'] != 'TRADING':
        print(f"Symbol {symbol} is not available for trading")
        return None
    
    # Convert start_date to timestamp
    if isinstance(start_date, str):
        start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    else:
        start_dt = start_date
    
    # If symbol has onboard date, use the later of the two dates
    if symbol_info.get('onboardDate'):
        onboard_dt = datetime.fromtimestamp(symbol_info['onboardDate'] / 1000)
        earliest_dt = max(start_dt, onboard_dt)
    else:
        earliest_dt = start_dt
    
    return int(earliest_dt.timestamp() * 1000)

print("Symbol info functions defined")

Symbol info functions defined


In [4]:
def get_klines_batch(symbol, start_time, end_time, limit=1000):
    """Get klines for a specific time period with concurrent support"""
    rate_limiter.acquire()
    
    try:
        rate_limiter.wait_if_needed()
        
        url = f"{BINANCE_BASE_URL}/fapi/v1/klines"
        params = {
            'symbol': symbol,
            'interval': '1m',
            'startTime': start_time,
            'endTime': end_time,
            'limit': limit
        }
        
        response = requests.get(url, params=params, timeout=30)
        response.raise_for_status()
        
        rate_limiter.check_rate_limits(response.headers)
        
        return response.json()
        
    except requests.RequestException as e:
        print(f"Error getting klines for {symbol}: {e}")
        return []
    finally:
        rate_limiter.release()

def convert_kline_to_candle(kline_data, symbol):
    """Convert Binance kline data to Candle object"""
    return Candle(
        symbol=symbol,
        timestamp=datetime.fromtimestamp(int(kline_data[0]) / 1000),
        open=np.float64(kline_data[1]),
        high=np.float64(kline_data[2]),
        low=np.float64(kline_data[3]),
        close=np.float64(kline_data[4]),
        volume=np.float64(kline_data[5])
    )

def check_existing_data_range(symbol, data_dir):
    """Check what data range already exists for a symbol"""
    csv_file = data_dir / f"{symbol}.csv"
    
    if not csv_file.exists():
        return None, None
    
    try:
        df = pd.read_csv(csv_file)
        if df.empty:
            return None, None
        
        # Convert timestamp column to datetime if it's not already
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        min_date = df['timestamp'].min()
        max_date = df['timestamp'].max()
        
        print(f"  Existing data: {min_date} to {max_date} ({len(df)} candles)")
        return min_date, max_date
    
    except Exception as e:
        print(f"  Error reading existing data for {symbol}: {e}")
        return None, None

def calculate_missing_ranges(symbol, requested_start, requested_end, data_dir):
    """Calculate which date ranges need to be fetched based on existing data"""
    existing_start, existing_end = check_existing_data_range(symbol, data_dir)
    
    # Convert requested dates to datetime if they're strings
    if isinstance(requested_start, str):
        requested_start = datetime.strptime(requested_start, "%Y-%m-%d")
    if isinstance(requested_end, str):
        requested_end = datetime.strptime(requested_end, "%Y-%m-%d")
    
    ranges_to_fetch = []
    
    if existing_start is None or existing_end is None:
        # No existing data, fetch the entire range
        ranges_to_fetch.append((requested_start, requested_end))
        print(f"  No existing data - fetching entire range")
    else:
        # Check if we need data before existing range
        if requested_start < existing_start:
            ranges_to_fetch.append((requested_start, existing_start - timedelta(minutes=1)))
            print(f"  Need data before existing: {requested_start} to {existing_start - timedelta(minutes=1)}")
        
        # Check if we need data after existing range
        if requested_end > existing_end:
            ranges_to_fetch.append((existing_end + timedelta(minutes=1), requested_end))
            print(f"  Need data after existing: {existing_end + timedelta(minutes=1)} to {requested_end}")
        
        if not ranges_to_fetch:
            print(f"  All requested data already exists")
    
    return ranges_to_fetch

def get_candle_batch_worker(args):
    """Worker function for concurrent batch processing"""
    symbol, start_timestamp, end_timestamp, batch_num = args
    
    try:
        klines = get_klines_batch(symbol, start_timestamp, end_timestamp)
        if klines:
            candles = [convert_kline_to_candle(kline, symbol) for kline in klines]
            return batch_num, candles, klines[-1][0]  # Return batch number, candles, and last timestamp
        return batch_num, [], None
    except Exception as e:
        print(f"Error in batch {batch_num} for {symbol}: {e}")
        return batch_num, [], None

def get_all_candles_for_symbol(symbol, start_date, end_date, data_dir):
    """Get all 1-minute candles for a symbol within date range, only fetching missing data"""
    print(f"Getting data for {symbol}...")
    
    # Get the earliest valid timestamp for this symbol
    earliest_valid = get_earliest_valid_timestamp(symbol, start_date)
    if not earliest_valid:
        print(f"Skipping {symbol}: not available")
        return []
    
    earliest_valid_dt = datetime.fromtimestamp(earliest_valid / 1000)
    
    # Use the later of requested start date or symbol's earliest valid date
    effective_start = max(
        start_date if isinstance(start_date, datetime) else datetime.strptime(start_date, "%Y-%m-%d"),
        earliest_valid_dt
    )
    
    # Calculate missing ranges
    missing_ranges = calculate_missing_ranges(symbol, effective_start, end_date, data_dir)
    
    if not missing_ranges:
        print(f"  No new data to fetch for {symbol}")
        return []
    
    all_new_candles = []
    
    for range_start, range_end in missing_ranges:
        print(f"  Fetching range: {range_start} to {range_end}")
        
        start_timestamp = int(range_start.timestamp() * 1000)
        end_timestamp = int(range_end.timestamp() * 1000)
        
        # Create batches for this range
        batch_tasks = []
        current_start = start_timestamp
        batch_num = 0
        
        while current_start < end_timestamp:
            current_end = min(current_start + (1000 * 60 * 1000), end_timestamp)
            batch_tasks.append((symbol, current_start, current_end, batch_num))
            current_start = current_end
            batch_num += 1
       
        print(f"    Processing {len(batch_tasks)} batches for this range...")
        
        # Process batches concurrently
        with ThreadPoolExecutor(max_workers=6) as executor:
            batch_results = list(executor.map(get_candle_batch_worker, batch_tasks))
        
        # Sort results by batch number and combine
        batch_results.sort(key=lambda x: x[0])
        
        range_candles = []
        for batch_num, candles, last_timestamp in batch_results:
            if candles:
                range_candles.extend(candles)
        
        all_new_candles.extend(range_candles)
        print(f"    Fetched {len(range_candles)} candles for this range")
    
    print(f"  Completed {symbol}: {len(all_new_candles)} new candles fetched")
    return all_new_candles

print("Updated candle collection functions with smart range detection")

Updated candle collection functions with smart range detection


In [5]:
def save_candles_to_csv(candles, symbol, data_dir):
    """Save candles to CSV file, appending to existing data if present"""
    csv_file = data_dir / f"{symbol}.csv"
    
    if not candles:
        print(f"No new candles to save for {symbol}")
        return
    
    # Convert new candles to DataFrame
    new_df_data = []
    for candle in candles:
        new_df_data.append({
            'timestamp': candle.timestamp,
            'open': candle.open,
            'high': candle.high,
            'low': candle.low,
            'close': candle.close,
            'volume': candle.volume
        })
    
    new_df = pd.DataFrame(new_df_data)
    
    # Check if existing file exists
    if csv_file.exists():
        try:
            existing_df = pd.read_csv(csv_file)
            existing_df['timestamp'] = pd.to_datetime(existing_df['timestamp'])
            
            # Combine existing and new data
            new_df['timestamp'] = pd.to_datetime(new_df['timestamp'])
            combined_df = pd.concat([existing_df, new_df], ignore_index=True)
            
            # Remove duplicates and sort
            combined_df = combined_df.drop_duplicates(subset=['timestamp'], keep='last')
            combined_df = combined_df.sort_values('timestamp')
            
            print(f"  Appending {len(new_df)} new candles to existing {len(existing_df)} candles for {symbol}")
            print(f"  Total after merge: {len(combined_df)} candles")
            
        except Exception as e:
            print(f"  Error reading existing file for {symbol}: {e}")
            print(f"  Creating new file with {len(new_df)} candles")
            combined_df = new_df.sort_values('timestamp')
    else:
        # No existing file
        combined_df = new_df.sort_values('timestamp')
        print(f"  Creating new file with {len(combined_df)} candles for {symbol}")
    
    # Save combined data
    combined_df.to_csv(csv_file, index=False)
    print(f"  Saved to {csv_file}")

def load_filtered_tokens(tokens_file, max_symbols=None):
    """Load symbols from filtered_tokens.csv"""
    df = pd.read_csv(tokens_file)
    symbols = df['binance_symbol'].tolist()
    
    if max_symbols is not None:
        symbols = symbols[:max_symbols]
        print(f"Limited to first {max_symbols} symbols")
    
    print(f"Loaded {len(symbols)} symbols from {tokens_file}")
    return symbols

print("Updated utility functions with smart appending")

Updated utility functions with smart appending


In [6]:
def process_symbol_worker(args):
    """Worker function for processing individual symbols concurrently"""
    symbol, start_date, end_date, symbol_index, total_symbols = args
    
    print(f"\n[{symbol_index}/{total_symbols}] Processing {symbol}")
    
    try:
        # Collect candles for this symbol (only missing data)
        candles = get_all_candles_for_symbol(symbol, start_date, end_date, DATA_DIR)
        
        if candles:
            # Save to CSV (will append to existing data)
            save_candles_to_csv(candles, symbol, DATA_DIR)
            return symbol, True, len(candles)
        else:
            # Check if we have existing data even if no new data was fetched
            existing_start, existing_end = check_existing_data_range(symbol, DATA_DIR)
            if existing_start is not None:
                print(f"  {symbol} - All requested data already exists")
                return symbol, True, 0
            else:
                print(f"  No candles collected for {symbol}")
                return symbol, False, 0
            
    except Exception as e:
        print(f"  Error processing {symbol}: {e}")
        return symbol, False, 0

def collect_candle_data(start_date="2024-01-01", end_date="2024-12-31", max_symbols=None, max_concurrent_symbols=3):
    """
    Main function to collect candle data for multiple symbols with concurrent processing and smart appending
    
    Args:
        start_date: Start date in YYYY-MM-DD format or datetime object
        end_date: End date in YYYY-MM-DD format or datetime object  
        max_symbols: Maximum number of symbols to process (None for all)
        max_concurrent_symbols: Maximum number of symbols to process concurrently
    """
    
    print("=== Binance USDT Perpetual Futures Candle Collection (Smart Append) ===")
    print(f"Date range: {start_date} to {end_date}")
    print(f"Max concurrent symbols: {max_concurrent_symbols}")
    print("Note: Will only fetch missing data ranges, appending to existing files")
    
    # Load symbols
    symbols = load_filtered_tokens(TOKENS_FILE, max_symbols)
    
    # Ensure data directory exists
    DATA_DIR.mkdir(exist_ok=True)
    
    successful_collections = 0
    failed_collections = 0
    total_new_candles = 0
    symbols_with_existing_data = 0
    
    # Process symbols in batches to avoid overwhelming the API
    symbol_tasks = []
    for i, symbol in enumerate(symbols, 1):
        symbol_tasks.append((symbol, start_date, end_date, i, len(symbols)))
    
    # Process symbols concurrently in batches
    batch_size = max_concurrent_symbols
    for i in range(0, len(symbol_tasks), batch_size):
        batch = symbol_tasks[i:i + batch_size]
        
        print(f"\nProcessing batch {i//batch_size + 1}/{(len(symbol_tasks) + batch_size - 1)//batch_size}")
        
        with ThreadPoolExecutor(max_workers=max_concurrent_symbols) as executor:
            results = list(executor.map(process_symbol_worker, batch))
        
        # Process results
        for symbol, success, candle_count in results:
            if success:
                successful_collections += 1
                total_new_candles += candle_count
                if candle_count == 0:
                    symbols_with_existing_data += 1
            else:
                failed_collections += 1
        
        # Small delay between batches to be respectful
        if i + batch_size < len(symbol_tasks):
            time.sleep(2)
    
    print(f"\n=== Collection Summary ===")
    print(f"Successfully processed: {successful_collections}")
    print(f"Failed: {failed_collections}")
    print(f"Symbols with existing data (no new fetch needed): {symbols_with_existing_data}")
    print(f"Total symbols processed: {len(symbols)}")
    print(f"Total NEW candles fetched: {total_new_candles}")
    print(f"Data saved to: {DATA_DIR}")

print("Updated main collection function with smart appending support")

Updated main collection function with smart appending support


In [None]:
# Example 1: Test with 10 symbols for 52 weeks with smart appending
from datetime import datetime, timedelta
end_date = datetime.now()
start_date = end_date - timedelta(weeks=104)

print("=== Testing Smart Append Functionality ===")
print("This will check existing data and only fetch missing ranges")

# Test with concurrent processing and smart appending
collect_candle_data(start_date=start_date, end_date=end_date, max_symbols=50, max_concurrent_symbols=4)

print("Example completed - check the data directory for results!")

=== Testing Smart Append Functionality ===
This will check existing data and only fetch missing ranges
=== Binance USDT Perpetual Futures Candle Collection (Smart Append) ===
Date range: 2023-10-20 16:25:15.360619 to 2025-10-17 16:25:15.360619
Max concurrent symbols: 4
Note: Will only fetch missing data ranges, appending to existing files
Limited to first 50 symbols
Loaded 50 symbols from /home/zac/back-testing/data/filtered_tokens.csv

Processing batch 1/13

[1/50] Processing ONEUSDT
Getting data for ONEUSDT...

[2/50] Processing SUSHIUSDT
Getting data for SUSHIUSDT...

[3/50] Processing REDUSDT
Getting data for REDUSDT...

[4/50] Processing XVGUSDT
Getting data for XVGUSDT...
  No existing data - fetching entire range
  Fetching range: 2023-10-20 16:25:15.360619 to 2025-10-17 16:25:15.360619
    Processing 1049 batches for this range...
  No existing data - fetching entire range
  Fetching range: 2023-10-20 16:25:15.360619 to 2025-10-17 16:25:15.360619
    Processing 1049 batches for