In [1]:
# import libraries and setup
import os
import requests
import time
import hmac
import hashlib
import urllib.parse
import findspark
from dotenv import load_dotenv
from datetime import datetime, timedelta

In [2]:
# load environment variables
load_dotenv(dotenv_path="/home/jovyan/.env", override=True)
API_KEY = os.getenv("BINANCE_API_KEY")
SECRET_KEY = os.getenv("BINANCE_SECRET_KEY")

print("Environment variables loaded successfully")


Environment variables loaded successfully


In [3]:
# initialise Spark
findspark.init()
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import from_unixtime


spark = SparkSession.builder \
    .appName("BinanceToMongoDB_Enhanced") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .config("spark.mongodb.write.connection.uri", 
            "mongodb://crypto_project:dst123@crypto_mongo:27017/cryptobot.historical_data?authSource=admin") \
    .config("spark.mongodb.read.connection.uri", 
            "mongodb://crypto_project:dst123@crypto_mongo:27017/cryptobot.historical_data?authSource=admin") \
    .getOrCreate()

print("Spark session created successfully")
print(f"Spark version: {spark.version}")


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1beae8b1-9bf6-4c1c-a815-4f801afa502f;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.1.1 in central
	found org.mongodb#mongodb-driver-sync;4.8.2 in central
	[4.8.2] org.mongodb#mongodb-driver-sync;[4.8.1,4.8.99)
	found org.mongodb#bson;4.8.2 in central
	found org.mongodb#mongodb-driver-core;4.8.2 in central
	found org.mongodb#bson-record-codec;4.8.2 in central
downloading https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar ...
	[SUCCESSFUL ] org.mongodb.spark#mongo-spark-connector_2.12;10.1.1!mongo-spark-connector_2.12.jar (160ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-sync/4.8.2/mongodb-driver-sync-4.8.2.jar ...
	[SUC

Spark session created successfully
Spark version: 3.4.2


In [4]:
# enhanced Binance API functions with authentication
def create_signature(query_string, secret_key):
    """Create HMAC SHA256 signature for authenticated requests"""
    return hmac.new(
        secret_key.encode('utf-8'),
        query_string.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

def get_authenticated_klines(symbol="BTCUSDT", interval="1h", limit=1000, start_time=None, end_time=None):
    """Fetch klines with authenticated API for potentially more data"""
    base_url = "https://api.binance.com"
    endpoint = "/api/v3/klines"
    
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": min(limit, 1000)  # Binance max is 1000
    }
    
    if start_time:
        params["startTime"] = int(start_time)
    if end_time:
        params["endTime"] = int(end_time)
    
    # add timestamp for authenticated request
    params["timestamp"] = int(time.time() * 1000)
    # create query string
    query_string = urllib.parse.urlencode(params)
    # create signature
    signature = create_signature(query_string, SECRET_KEY)
    # add signature to params
    params["signature"] = signature

    headers = {
        "X-MBX-APIKEY": API_KEY
    }
    
    try:
        response = requests.get(base_url + endpoint, params=params, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Authenticated request failed, trying public: {e}")
        # fallback to public API
        return get_public_klines(symbol, interval, min(limit, 1000), start_time, end_time)

def get_public_klines(symbol="BTCUSDT", interval="1h", limit=1000, start_time=None, end_time=None):
    """Fallback public API function"""
    url = "https://api.binance.com/api/v3/klines"
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit,
    }
    if start_time:
        params["startTime"] = int(start_time)
    if end_time:
        params["endTime"] = int(end_time)
    
    response = requests.get(url, params=params)
    response.raise_for_status()
    return response.json()


In [5]:
def fetch_historical_data(symbol="BTCUSDT", target_count=3000):
    """
    Strategic approach to get 3000+ records:
    1. Try shorter intervals first (more data points)
    2. Go back in time systematically
    3. Use both authenticated and public APIs
    """
    intervals_to_try = [
        ("1m", "1-minute", 1000 * 60),      # 1000 mins back
        ("3m", "3-minute", 3000 * 60),      # 3000 mins back  
        ("5m", "5-minute", 5000 * 60),      # 5000 mins back
        ("15m", "15-minute", 15000 * 60),   # 15000 mins back
        ("30m", "30-minute", 30000 * 60),   # 30000 mins back
        ("1h", "1-hour", 60000 * 60),       # 60000 mins back
    ]
    
    all_rows = []
    successful_config = None
    for interval, desc, lookback_minutes in intervals_to_try:
        print(f"\n=== Trying {symbol} with {interval} ({desc}) ===")
        try:
            end_time = int(time.time() * 1000)  # current time in milliseconds
            lookback_ms = lookback_minutes * 1000  # convert to milliseconds
            current_rows = []
            current_time = end_time
            fetched_batches = 0
            max_batches = 10  # limit to avoid infinite loops
            while len(current_rows) < target_count and fetched_batches < max_batches:
                # calculate start time for this batch
                batch_start = current_time - (1000 * lookback_ms // 1000)  # go back by interval
                print(f"Batch {fetched_batches + 1}: Fetching from {datetime.fromtimestamp(batch_start/1000)} to {datetime.fromtimestamp(current_time/1000)}")
                # attempt authenticated first, then public
                try:
                    data = get_authenticated_klines(
                        symbol=symbol, 
                        interval=interval, 
                        limit=1000,
                        start_time=batch_start,
                        end_time=current_time
                    )
                except:
                    data = get_public_klines(
                        symbol=symbol, 
                        interval=interval, 
                        limit=1000,
                        start_time=batch_start,
                        end_time=current_time
                    )
                if not data or len(data) == 0:
                    print(f"No data received for this time range")
                    break
                # convert to rows
                for row in data:
                    current_rows.append(Row(
                        symbol=symbol,
                        open_time=int(row[0]),
                        open=float(row[1]),
                        high=float(row[2]),
                        low=float(row[3]),
                        close=float(row[4]),
                        volume=float(row[5]),
                        close_time=int(row[6]),
                        quote_volume=float(row[7]),
                        num_trades=int(row[8]),
                        taker_base_volume=float(row[9]),
                        taker_quote_volume=float(row[10]),
                        ignore=row[11]
                    ))
                
                print(f"Received {len(data)} records, total: {len(current_rows)}")
                # move time window back
                current_time = batch_start - 1
                fetched_batches += 1
                # rate limiting
                time.sleep(0.2) 
                # stop if we got less than requested (hit the limit)
                if len(data) < 1000:
                    print(f"Reached historical data limit")
                    break
            
            print(f"Final count for {interval}: {len(current_rows)} records")
            # use configuration (with enough records)
            if len(current_rows) >= target_count:
                print(f"SUCCESS! Got {len(current_rows)} records with {symbol} {interval}")
                all_rows = current_rows
                successful_config = (symbol, interval, desc)
                break
                
            elif len(current_rows) > len(all_rows):
                all_rows = current_rows
                successful_config = (symbol, interval, desc)
                print(f"Best so far: {len(current_rows)} records with {symbol} {interval}")      
        except Exception as e:
            print(f"Error with {symbol} {interval}: {e}")
            continue
    
    # remove duplicates based on open_time (regarding overlapping batches)
    if all_rows:
        seen_times = set()
        unique_rows = []
        for row in all_rows:
            if row.open_time not in seen_times:
                unique_rows.append(row)
                seen_times.add(row.open_time)

        print(f"Before deduplication: {len(all_rows)} records")
        print(f"After deduplication: {len(unique_rows)} records")
        all_rows = unique_rows
        all_rows.sort(key=lambda x: x.open_time) # sort by time
    
    return all_rows, successful_config

print("Strategic fetcher function defined")


Strategic fetcher function defined


In [6]:
rows, config = fetch_historical_data(symbol="BTCUSDT", target_count=3000)
print(f"Total records collected: {len(rows)}")

if config:
    print(f"Configuration used: {config[0]} with {config[1]} interval ({config[2]})")
else:
    print("No successful configuration found")

print(f"You have {len(rows)} records!")


=== Trying BTCUSDT with 1m (1-minute) ===
Batch 1: Fetching from 2025-06-08 22:26:02.180000 to 2025-06-09 15:06:02.180000
Authenticated request failed, trying public: 400 Client Error: Bad Request for url: https://api.binance.com/api/v3/klines?symbol=BTCUSDT&interval=1m&limit=1000&startTime=1749421562180&endTime=1749481562180&timestamp=1749481562180&signature=b09de3abd806a92cfc3abab790ff16613a2f70f4dd313466a51f31792f2ee303
Received 1000 records, total: 1000
Batch 2: Fetching from 2025-06-08 05:46:02.179000 to 2025-06-08 22:26:02.179000
Authenticated request failed, trying public: 400 Client Error: Bad Request for url: https://api.binance.com/api/v3/klines?symbol=BTCUSDT&interval=1m&limit=1000&startTime=1749361562179&endTime=1749421562179&timestamp=1749481563139&signature=e68a7e7ca161643eaa5d3502329e0fbdaf2911fbf7f896bc572b2fa0336f868b
Received 1000 records, total: 2000
Batch 3: Fetching from 2025-06-07 13:06:02.178000 to 2025-06-08 05:46:02.178000
Authenticated request failed, trying 

In [7]:
# create enhanced Spark DataFrame
if len(rows) > 0:
    print(f"\nCreating Spark DataFrame from {len(rows)} records...")
    spark_df = spark.createDataFrame(rows)
    
    # add enhanced datetime columns and calculated fields
    spark_df = spark_df.withColumn("open_datetime", from_unixtime(spark_df.open_time / 1000)) \
                     .withColumn("close_datetime", from_unixtime(spark_df.close_time / 1000))
    
    # add useful calculated columns
    from pyspark.sql.functions import col, round as spark_round
    
    spark_df = spark_df.withColumn("price_change", col("close") - col("open")) \
                     .withColumn("price_change_pct", spark_round(((col("close") - col("open")) / col("open")) * 100, 4)) \
                     .withColumn("high_low_spread", col("high") - col("low")) \
                     .withColumn("high_low_spread_pct", spark_round(((col("high") - col("low")) / col("open")) * 100, 4))
    
    record_count = spark_df.count()
    print(f"Spark DataFrame created with {record_count} records")
    
    print(f"\nDataFrame Schema:")
    spark_df.printSchema()
    
    # display enhanced data sample
    spark_df.select(
        "symbol", "open_datetime", "open", "high", "low", "close", 
        "volume", "price_change", "price_change_pct"
    ).show(5, truncate=False)
    
    # display stats
    date_stats = spark_df.select("open_datetime").agg({
        "open_datetime": "min"
    }).collect()[0][0]
    date_stats_max = spark_df.select("open_datetime").agg({
        "open_datetime": "max"
    }).collect()[0][0]
    
    print(f"Date range: {date_stats} to {date_stats_max}")    
    price_stats = spark_df.select("open", "high", "low", "close", "volume").describe()
    price_stats.show()
else:
    print("No data to create DataFrame")

spark_df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .save()

print("Data successfully saved to MongoDB")


Creating Spark DataFrame from 3000 records...


                                                                                

Spark DataFrame created with 3000 records

DataFrame Schema:
root
 |-- symbol: string (nullable = true)
 |-- open_time: long (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- close_time: long (nullable = true)
 |-- quote_volume: double (nullable = true)
 |-- num_trades: long (nullable = true)
 |-- taker_base_volume: double (nullable = true)
 |-- taker_quote_volume: double (nullable = true)
 |-- ignore: string (nullable = true)
 |-- open_datetime: string (nullable = true)
 |-- close_datetime: string (nullable = true)
 |-- price_change: double (nullable = true)
 |-- price_change_pct: double (nullable = true)
 |-- high_low_spread: double (nullable = true)
 |-- high_low_spread_pct: double (nullable = true)

+-------+-------------------+---------+---------+---------+---------+-------+-------------------+----------------+
|symbol |open_dat

25/06/09 15:06:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+------------------+-----------------+-----------------+
|summary|              open|              high|               low|            close|           volume|
+-------+------------------+------------------+------------------+-----------------+-----------------+
|  count|              3000|              3000|              3000|             3000|             3000|
|   mean|105875.21488000001|105889.06840000003|105862.20855999998|105876.0221966667|7.350430646666668|
| stddev| 610.8414321745595| 616.7490764571152| 605.9000895352486|611.7122256292652|11.72553956669429|
|    min|          105001.0|         105036.04|         104964.14|        105001.01|          0.12756|
|    max|         107950.45|          108000.0|         107919.56|        107950.44|         207.1351|
+-------+------------------+------------------+------------------+-----------------+-----------------+

Data successfully saved to MongoDB


                                                                                

In [8]:
# verification
if len(rows) > 0:    
    df_read = spark.read \
        .format("mongodb") \
        .option("spark.mongodb.read.connection.uri", 
                "mongodb://crypto_project:dst123@crypto_mongo:27017/cryptobot.historical_data?authSource=admin") \
        .load()
    
    mongo_count = df_read.count()
    print(f"Records verified in MongoDB: {mongo_count}")
    
    # display stored data sample
    df_read.select(
        "symbol", "open_datetime", "open", 
        "close", "volume", "price_change_pct"
    ).show(5, truncate=False)
    
    print(f"Records fetched: {len(rows)}")
    print(f"Records in MongoDB: {mongo_count}")
    print(f"Symbol: {config[0] if config else 'BTCUSDT'}")
    print(f"Interval: {config[1] if config else 'Various'}")
    # spark.stop()
    print(f"Process completed successfully!")
else:
    print(f"\n Process completed but no data was collected")


Records verified in MongoDB: 3000
+-------+-------------------+---------+---------+--------+----------------+
|symbol |open_datetime      |open     |close    |volume  |price_change_pct|
+-------+-------------------+---------+---------+--------+----------------+
|BTCUSDT|2025-06-09 02:37:00|105617.76|105603.41|1.65613 |-0.0136         |
|BTCUSDT|2025-06-09 02:38:00|105603.42|105627.91|3.97305 |0.0232          |
|BTCUSDT|2025-06-09 02:39:00|105627.91|105602.87|2.05486 |-0.0237         |
|BTCUSDT|2025-06-09 02:40:00|105602.87|105593.34|0.48263 |-0.009          |
|BTCUSDT|2025-06-09 02:41:00|105593.34|105534.88|12.72816|-0.0554         |
+-------+-------------------+---------+---------+--------+----------------+
only showing top 5 rows

Records fetched: 3000
Records in MongoDB: 3000
Symbol: BTCUSDT
Interval: 1m
Process completed successfully!
