In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import sqlite3
import requests
import pandas as pd
import numpy as np
import time
from datetime import datetime
import concurrent.futures

In [None]:
COINS_TO_FETCH = ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'ADA-USDT', 'XRP-USDT', 'DOGE-USDT']
DATABASE_FILE_PATH = 'crypto_data.db'
TABLE_NAME = 'crypto_prices'

# Data Fetching

In [None]:
def fetch_coin_data(symbol):
    days_to_fetch = 2000
    start_timestamp = int(time.time()) - (days_to_fetch * 24 * 60 * 60)
    url = f"https://api.kucoin.com/api/v1/market/candles?type=1day&symbol={symbol}&startAt={start_timestamp}"

    print(f"[FETCHING] Starting download for {symbol}...")
    try:
        response = requests.get(url)
        response.raise_for_status()
        raw_data = response.json()['data']
        print(f"[FETCHING] Completed download for {symbol}, found {len(raw_data)} records.")
        return process_api_data(raw_data, symbol)
    except requests.exceptions.RequestException as e:
        print(f"  > ERROR: Failed to fetch {symbol}. Reason: {e}")
        return []

In [None]:
def process_api_data(raw_data, symbol):
    processed_data = []
    for row in raw_data:
        processed_data.append((
            int(row[0]) * 1000,
            datetime.fromtimestamp(int(row[0])).strftime('%Y-%m-%d'),
            symbol.replace('-', ''),
            float(row[1]), float(row[3]), float(row[4]), float(row[2]),
            float(row[5]), float(row[6])
        ))
    return processed_data

# DATABASE STORAGE

In [None]:
def update_database(db_path, table_name, data_to_insert):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute(f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        Unix INTEGER, Date TEXT, Symbol TEXT, Open REAL, High REAL,
        Low REAL, Close REAL, Volume_Base REAL, Volume_Quote REAL,
        UNIQUE(Symbol, Unix)
    )
    """)
    insert_query = f"INSERT OR IGNORE INTO {table_name} (Unix, Date, Symbol, Open, High, Low, Close, Volume_Base, Volume_Quote) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
    cur.executemany(insert_query, data_to_insert)
    conn.commit()
    print(f"\n[DATABASE] Successfully connected to '{db_path}'.")
    print(f"[DATABASE] Inserted {cur.rowcount} new records. Skipped duplicates.")
    conn.close()

# METRICS CALCULATION

In [None]:
def percent_change(prices):
    changes = []
    for i in range(1, len(prices)):
        change = ((prices[i] - prices[i - 1]) / prices[i - 1]) * 100
        changes.append(round(change, 2))
    return changes

In [None]:
def moving_average(prices, window=30):
    sma = []
    for i in range(window, len(prices) + 1):
        avg = np.mean(prices[i - window:i])
        sma.append(round(avg, 2))
    return sma

In [None]:
def calculate_volatility(prices):
    changes = percent_change(prices)
    if not changes: return 0.0
    return round(np.std(changes), 2)

In [None]:
def trading_signal(prices):
    changes = percent_change(prices)
    signals = []
    for change in changes:
        if change > 2:
            signals.append("BUY")
        elif change < -2:
            signals.append("SELL")
        else:
            signals.append("HOLD")
    return signals

In [None]:
def calculate_metrics_for_symbol(symbol):
    print(f"[ANALYSIS] Starting calculation for {symbol}...")
    conn = sqlite3.connect(DATABASE_FILE_PATH)

    query = f"SELECT Date, Close FROM {TABLE_NAME} WHERE Symbol = '{symbol}' ORDER BY Date ASC"
    df = pd.read_sql_query(query, conn, parse_dates=['Date'], index_col='Date')
    conn.close()

    prices = df['Close'].tolist()
    if len(prices) < 30:
        print(f"[ANALYSIS] Insufficient data for {symbol}. Skipping.")
        return None

    daily_changes = percent_change(prices)
    ma_30 = moving_average(prices, window=30)
    signals = trading_signal(prices)
    volatility = calculate_volatility(prices)

    df['Percent_Change'] = [np.nan] + daily_changes
    df['30_Day_MA'] = [np.nan] * 29 + ma_30
    df['Signal'] = [np.nan] + signals
    df['Volatility'] = volatility
    df['Symbol'] = symbol

    print(f"[ANALYSIS] Completed calculation for {symbol}.")
    return df.reset_index()

# MAIN EXECUTION SCRIPT

In [None]:
if __name__ == "__main__":

    print("--- STEP 1: FETCHING DATA IN PARALLEL ---")
    all_fetched_data = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(COINS_TO_FETCH)) as executor:
        future_to_coin = {executor.submit(fetch_coin_data, coin): coin for coin in COINS_TO_FETCH}
        for future in concurrent.futures.as_completed(future_to_coin):
            result = future.result()
            if result:
                all_fetched_data.extend(result)

    print("\n--- STEP 2: STORING DATA IN DATABASE ---")
    if all_fetched_data:
        update_database(DATABASE_FILE_PATH, TABLE_NAME, all_fetched_data)
    else:
        print("[DATABASE] No data fetched, skipping database update.")

    print("\n--- STEP 3: CALCULATING METRICS IN PARALLEL ---")
    all_metrics_dfs = []
    symbols_in_db = [coin.replace('-USDT', 'USDT') for coin in COINS_TO_FETCH]
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(symbols_in_db)) as executor:
        future_to_symbol = {executor.submit(calculate_metrics_for_symbol, symbol): symbol for symbol in symbols_in_db}
        for future in concurrent.futures.as_completed(future_to_symbol):
            result_df = future.result()
            if result_df is not None:
                all_metrics_dfs.append(result_df)

    if all_metrics_dfs:
        final_metrics_df = pd.concat(all_metrics_dfs, ignore_index=True)
        final_metrics_df.sort_values(by=['Symbol', 'Date'], inplace=True)

        print("\n--- FINAL METRICS DATAFRAME (LAST 10 ROWS) ---")
        print(final_metrics_df.tail(10))
    else:
        print("\nNo metrics were calculated.")

    print("\n--- PIPELINE COMPLETE ---")

--- STEP 1: FETCHING DATA IN PARALLEL ---
[FETCHING] Starting download for BTC-USDT...
[FETCHING] Starting download for ETH-USDT...
[FETCHING] Starting download for SOL-USDT...
[FETCHING] Starting download for ADA-USDT...
[FETCHING] Starting download for XRP-USDT...
[FETCHING] Starting download for DOGE-USDT...
[FETCHING] Completed download for BTC-USDT, found 1500 records.
[FETCHING] Completed download for DOGE-USDT, found 1500 records.
[FETCHING] Completed download for ADA-USDT, found 1500 records.
[FETCHING] Completed download for ETH-USDT, found 1500 records.
[FETCHING] Completed download for XRP-USDT, found 1500 records.
[FETCHING] Completed download for SOL-USDT, found 1488 records.

--- STEP 2: STORING DATA IN DATABASE ---

[DATABASE] Successfully connected to 'crypto_data.db'.
[DATABASE] Inserted 8988 new records. Skipped duplicates.

--- STEP 3: CALCULATING METRICS IN PARALLEL ---
[ANALYSIS] Starting calculation for BTCUSDT...
[ANALYSIS] Starting calculation for ETHUSDT...
[AN