## Getting data from Binance

The following function connects to the Binance API and download the latest data. This is already done and you don't need to change anything.
Please read it to get some understanding about what it is doing. 

In [2]:
from datetime import datetime, timedelta
from typing import Any, Optional
import requests

def get_binance_data(start_time: Optional[datetime], end_time: datetime) -> list[dict]:
    """
    The function fetches the data from Binance API and returns the records as a list of objects

    # Parameters
    * start_time: the start time of the data to be fetched
    * end_time: the end time of the data to be fetched

    # Returns
    A list of objects with the following fields:
        * time: the opening time of the candle
        * open: the open price
        * high: the high price
        * low: the low price
        * close: the close price
        * volume: the volume
        * quote_volume: the quote volume
        * trades: the number of trades
        * taker_base_vol: the taker base volume
        * taker_quote_vol: the taker quote volume        
    """

    # Check if there is new data to download
    if start_time is not None and end_time < start_time:
        print("There is no new data yet.")
        return []
    
    candles: list[dict[str, Any]] = []
    response, params = None, {}
    try:
        # Each API call can get at most 1000 records from Binance. We loop until we reach the latest record
        while True:
            # This is the parameters we pass to the Binance API. Read the Binance documentation for more information
            params = {
                "symbol": "BTCUSDT",
                "interval": "1h",
                "limit": 1000,            
            }
            if start_time is not None:
                params["startTime"] = int(start_time.timestamp() * 1000)
            else:
                params["startTime"] = 1499990400000 # 14 Jul 2017 00:00:00            
            
            # Connect to Binance and get the response in JSON format
            response = requests.get("https://api.binance.com/api/v3/klines", params=params)
            response_json = response.json()                         

            # Convert the data into a list of objects   
            for record in response_json:
                candle = {
                    "time": datetime.fromtimestamp(record[0] // 1000),
                    "open": float(record[1]),
                    "high": float(record[2]),
                    "low": float(record[3]),
                    "close": float(record[4]),
                    "volume": float(record[5]),
                    "quote_volume": float(record[7]),
                    "trades": int(record[8]),
                    "taker_base_vol": float(record[9]),
                    "taker_quote_vol": float(record[10]),
                }
                candles.append(candle)
            
            # Check the last date of the data. If we reach the latest date, we stop the loop
            last_date = candles[-1]["time"]        
            print(f"Fetch data up to {last_date}", end="\r")
            if start_time is not None and start_time >= end_time:
                return candles
            
            # If we haven't reached the latest date, we update the start time and continue the loop
            start_time = last_date + timedelta(hours=1)
    except:
        print(f"Error: the response is {response if response is not None else 'empty'}")
        print(f"The API parameters are {params}")
        print("Don't worry if you get this error. It probably because of network problem. Retry again. If the problem persists, contact the teacher.")
        exit(0)    

## Your tasks below

You need to implement the following 3 functions

In [43]:
from pymongo.mongo_client import MongoClient
from pymongo.errors import ConnectionFailure
from datetime import datetime
from urllib.parse import quote_plus

def connect_mongo() -> MongoClient:
    """
    Return the MongoDB client after successful connection and initialize database with Binance data
    """    
    # need to use curl ifconfig.me to get the IP address of the codespace/ pc and add it to the IP whitelist in MongoDB first
    username = quote_plus("py")
    password = quote_plus("p233340")
    
    connection_string = (
        f"mongodb+srv://{username}:{password}@com6002ass2.udksn.mongodb.net/"
        "?retryWrites=true&w=majority&appName=COM6002ASS2"
        "&tls=true&tlsAllowInvalidCertificates=true"
    )
    
    try:
        # Connect to MongoDB
        client = MongoClient(
            connection_string,
            serverSelectionTimeoutMS=5000
        )
        
        # Test the connection
        client.admin.command('ping')
        
        # Create/get database and collection
        db = client['binance']
        collection = db['COM6002_ASS2']
        
        # Create index for time field if it doesn't exist
        collection.create_index([('time', 1)], unique=True)
        
        return client
        
    except ConnectionFailure as e:
        raise Exception(f"Failed to connect to MongoDB: {e}")
    except Exception as e:
        raise Exception(f"Error setting up database: {e}")

def get_last_date(client: MongoClient) -> Optional[datetime]:
    """
    Return the datetime of the latest record in the database.
    """
    db = client['binance']
    collection = db['COM6002_ASS2']
    
    # Find the latest record by sorting in descending order
    latest_record = collection.find_one(
        {},
        sort=[('time', -1)]  # Changed from 'timestamp' to 'time'
    )
    
    if latest_record and 'time' in latest_record:
        return latest_record['time']
    return None

def save_records(client: MongoClient, records: list) -> None:
    """
    Save the records to MongoDB, skipping duplicates

    """
    if not records:
        print("No records to save")
        return
        
    db = client['binance']
    collection = db['COM6002_ASS2']
    
    # Filter out records that already exist
    existing_times = set(
        doc['time'] for doc in 
        collection.find(
            {'time': {'$in': [record['time'] for record in records]}}, 
            {'time': 1}
        )
    )
    
    new_records = [
        record for record in records 
        if record['time'] not in existing_times
    ]
    
    if not new_records:
        print("No new records to save")
        return
        
    try:
        result = collection.insert_many(new_records)
        print(f"Successfully inserted {len(result.inserted_ids)} new records")
    except Exception as e:
        print(f"Error saving records: {e}")

## Main execution codes

You don't need to change anything below. It is the high level implementation of how we update the database contents from external sources

In [44]:
from datetime import timedelta
def fetch_data() -> None:
    """
    This is the main execution logic.

    The procedures are:
    1. Connect to the mongo database.
    2. Get the last date from the database.
    3. Get the data from binance after the last date we get in the previous step.
    4. Save the data to the database.
    5. Close the connection to the database.    
    """
    client = connect_mongo()
    last_date = get_last_date(client)
    print(f"The last date is {last_date}")
    start_date = None
    if last_date is not None:
        start_date = last_date + timedelta(hours=1)
    end_date = datetime.now().replace(minute=0, second=0, microsecond=0)
    data = get_binance_data(start_date, end_date)
    if len(data) > 0:
        save_records(client, data)   
    client.close()       

## Execution

Call the function below to start the main execution logic

In [45]:
fetch_data()

The last date is 2024-11-09 06:00:00
There is no new data yet.


In [48]:
get_last_date(connect_mongo())

datetime.datetime(2024, 11, 9, 6, 0)

# Data Analytics

In [None]:
mongo = connect_mongo()

# Your analysis below

...