In [1]:
!pip install -q google-cloud==0.34.0 google-cloud-bigquery matplotlib pandas_ta scikit-learn emp-orderly-types emp-orderly setuptools ccxt pandas-gbq


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
!pip install -q numpy==1.26.4


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
# Importing necessary libraries and modules

# Standard library imports
import asyncio  # For asynchronous operations
import time  # For time-based operations and timestamps
import os  # For environment management and file operations
import warnings  # To filter out warnings

# Importing to fetch environment variables
from dotenv import load_dotenv

# Third-party library imports
import matplotlib.pyplot as plt  # For plotting and visualization
import numpy as np  # For numerical operations and array manipulations
import pandas as pd  # For data manipulation and analysis
import pandas_ta as ta  # For technical analysis indicators and tools
import joblib  # For model serialization and deserialization
import ccxt  # For cryptocurrency trading APIs and market data retrieval

# Machine Learning libraries
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier  # For ML model training
from sklearn.model_selection import train_test_split  # For splitting data into training and testing sets
from sklearn.preprocessing import StandardScaler  # For feature scaling
from sklearn.metrics import accuracy_score, classification_report  # For model evaluation

# Google Cloud imports
from google.cloud import bigquery  # For interacting with Google BigQuery
from google.cloud import storage  # For interacting with Google Cloud Storage
from google.oauth2 import service_account  # For Google Cloud authentication

# Empyreal SDK imports for strategy development and backtesting
from emp_orderly import Strategy, EmpOrderly  # For strategy implementation and management
from emp_orderly_types import PerpetualAssetType, Interval  # For defining asset types and intervals

warnings.filterwarnings('ignore')

In [4]:
load_dotenv()

# Retrieve GCP project ID and credentials path from environment variables
project_id = os.getenv("GCP_PROJECT_ID")
credentials_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

# Ensure that both environment variables are available
if not project_id or not credentials_path:
    raise ValueError("GCP_PROJECT_ID and GOOGLE_APPLICATION_CREDENTIALS must be set in the .env file.")

# Initialize Google Cloud service account credentials using the specified JSON file
credentials = service_account.Credentials.from_service_account_file(credentials_path)

# Create a BigQuery client using the specified credentials and project ID
client = bigquery.Client(credentials=credentials, project=project_id)

# Create a Storage client using the same credentials and project ID
storage_client = storage.Client(credentials=credentials, project=project_id)

print(f"Successfully initialized GCP clients for project: {project_id}")

Successfully initialized GCP clients for project: streamlit-apps-431010


In [5]:
# Function to fetch OHLCV (Open, High, Low, Close, Volume) data from a specified exchange
def fetch_ohlcv_data(exchange_name, symbol, timeframe, since):
    """
    Fetch OHLCV data from a specified exchange using the ccxt library.
    
    Parameters:
    - exchange_name (str): The name of the exchange (e.g., 'binance').
    - symbol (str): The trading pair symbol (e.g., 'BTC/USDT').
    - timeframe (str): The timeframe for the OHLCV data (e.g., '1h', '1d').
    - since (int): Timestamp (in milliseconds) from which to start fetching data.
    
    Returns:
    - pd.DataFrame: DataFrame containing OHLCV data with columns ['timestamp', 'open', 'high', 'low', 'close', 'volume'].
    - None: If an error occurs while fetching data.
    """
    try:
        # Initialize the exchange instance using the exchange name provided
        exchange = getattr(ccxt, exchange_name)()
        
        # Load exchange markets to ensure the symbol is recognized
        exchange.load_markets()

        # List to store all OHLCV data
        all_data = []

        # Fetch data iteratively until the current time
        while since < time.time() * 1000:
            # Fetch OHLCV data with a limit of 1000 records to avoid hitting rate limits
            ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=1000)
            
            # Break if no data is returned
            if len(ohlcv) == 0:
                break
            
            # Extend the list with the fetched data
            all_data.extend(ohlcv)

            # Update 'since' to the last fetched timestamp + 1ms to avoid overlap
            since = ohlcv[-1][0] + 1
            
            # Respect the exchange rate limit to avoid getting banned ;)
            time.sleep(exchange.rateLimit / 1000)

        # Convert the collected data to a pandas DataFrame
        df = pd.DataFrame(
            all_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
        )
        
        # Convert the 'timestamp' from milliseconds to a datetime format
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

        return df

    except Exception as e:
        print(f"Error fetching data: {e}")
        return None
    


# User-defined parameters for fetching OHLCV data
exchange_name = "kucoin"
symbol = "BTC/USDT"
timeframe = "30m"

# Define the starting date for fetching historical data and convert it to timestamp (ms)
since = int(time.mktime(time.strptime('2020-01-01', '%Y-%m-%d'))) * 1000

In [6]:
# Function to save a DataFrame to a specified Google BigQuery table
def save_to_bigquery(dataframe, table_id):
    """
    Save a pandas DataFrame to a Google BigQuery table.
    
    Parameters:
    - dataframe (pd.DataFrame): The DataFrame to be saved.
    - table_id (str): The BigQuery table identifier in the format 'dataset.table'.
    
    Returns:
    - None: Prints a success message if the operation is successful, 
            otherwise prints an error message.
    """
    try:
        # Save the DataFrame to the specified BigQuery table
        dataframe.to_gbq(
            destination_table=table_id,
            project_id=project_id,
            if_exists='replace',
            credentials=credentials
        )
        
        print(f"Data successfully saved to BigQuery table: {table_id}")
    except Exception as e:
        print(f"Error saving data to BigQuery: {e}")


In [7]:
# Function to save a trained model and its scaler to Google Cloud Storage (GCS)
def save_model_to_gcs(model, scaler, bucket_name, model_filename, scaler_filename):
    """
    Save a machine learning model and its corresponding scaler to a Google Cloud Storage bucket.
    
    Parameters:
    - model: The machine learning model object to be saved.
    - scaler: The scaler object used for data preprocessing.
    - bucket_name (str): Name of the GCS bucket where the model and scaler will be saved.
    - model_filename (str): Filename to use for saving the model.
    - scaler_filename (str): Filename to use for saving the scaler.
    
    Returns:
    - None: Prints success or error messages based on the outcome of the operation.
    """
    try:
        # Get or create the GCS bucket
        bucket = storage_client.bucket(bucket_name)
        
        # Check if the bucket exists; if not, create it
        if not bucket.exists():
            bucket = storage_client.create_bucket(bucket_name)
            print(f"Bucket '{bucket_name}' created.")

        # Create blobs (object placeholders) in the bucket for the model and scaler files
        model_blob = bucket.blob(model_filename)
        scaler_blob = bucket.blob(scaler_filename)

        # Save the model and scaler to local files
        joblib.dump(model, model_filename)
        joblib.dump(scaler, scaler_filename)

        # Upload the local model and scaler files to the corresponding GCS blobs
        model_blob.upload_from_filename(model_filename)
        scaler_blob.upload_from_filename(scaler_filename)

        print(f"Model and scaler successfully saved to GCS bucket: '{bucket_name}'")

    except Exception as e:
        print(f"Error saving model to GCS: {e}")


In [8]:
# Function to add technical indicators to a given DataFrame
def add_technical_indicators(df):
    """
    Add common technical indicators to a given price DataFrame.
    
    Parameters:
    - df (pd.DataFrame): DataFrame containing OHLCV data with columns ['open', 'high', 'low', 'close', 'volume'].
    
    Returns:
    - pd.DataFrame: The original DataFrame with additional columns for each technical indicator.
    """
    # Add Simple Moving Averages (SMA) for 10 and 20 periods
    df['SMA_10'] = ta.sma(df['close'], length=10)
    df['SMA_20'] = ta.sma(df['close'], length=20)

    # Add Relative Strength Index (RSI) with a length of 14 periods
    df['RSI'] = ta.rsi(df['close'], length=14)

    # Add Moving Average Convergence Divergence (MACD) with fast=12, slow=26, signal=9
    macd = ta.macd(df['close'], fast=12, slow=26, signal=9)
    df['MACD'] = macd['MACD_12_26_9']
    df['MACD_signal'] = macd['MACDs_12_26_9']

    # Add Bollinger Bands (BB) with length of 20 periods and a standard deviation of 2.0
    bbands = ta.bbands(df['close'], length=20)
    df['BB_upper'] = bbands['BBU_20_2.0']
    df['BB_middle'] = bbands['BBM_20_2.0']
    df['BB_lower'] = bbands['BBL_20_2.0']

    # Add Average True Range (ATR) with a length of 14 periods
    df['ATR'] = ta.atr(df['high'], df['low'], df['close'], length=14)

    # Add Momentum (MOM) indicator with a length of 10 periods
    df['MOM'] = ta.mom(df['close'], length=10)

    # Add Rate of Change (ROC) indicator with a length of 10 periods
    df['ROC'] = ta.roc(df['close'], length=10)

    # Drop rows with NaN values that may result from indicator calculations
    df.dropna(inplace=True)

    return df


In [9]:
# Function to prepare training data for machine learning models
def prepare_training_data(df):
    """
    Prepare feature matrix and target vector for training a machine learning model.
    
    Parameters:
    - df (pd.DataFrame): DataFrame containing technical indicators and price data.
    
    Returns:
    - X_train (np.array): Scaled training feature matrix.
    - X_test (np.array): Scaled testing feature matrix.
    - y_train (np.array): Training target vector.
    - y_test (np.array): Testing target vector.
    - scaler (StandardScaler): Fitted scaler object for future scaling transformations.
    """
    # Define target variable: 1 if next close price is 0.5% higher, otherwise 0
    df['target'] = np.where(df['close'].shift(-1) > df['close'] * 1.005, 1, 0)

    # Define the feature set used for model training
    features = [
        'SMA_10', 'SMA_20', 'RSI', 'MACD', 'MACD_signal',
        'BB_upper', 'BB_middle', 'BB_lower', 'ATR', 'MOM', 'ROC'
    ]

    # Create feature matrix X and target vector y
    X = df[features]
    y = df['target']

    # Split the data into training and testing sets (80% train, 20% test)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Initialize a StandardScaler to normalize the feature values
    scaler = StandardScaler()
    
    # Fit the scaler on the training data and transform both training and testing data
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Return the processed training and testing data along with the fitted scaler
    return X_train, X_test, y_train, y_test, scaler


In [10]:
# Main function for fetching data, feature engineering, training, and saving the model
if __name__ == "__main__":
    """
    Main function for executing the end-to-end process of data fetching, feature engineering,
    training the model, evaluating the model, and saving the model and scaler to Google Cloud Storage.
    """
    # Fetch the historical data using the defined parameters
    data = fetch_ohlcv_data(exchange_name, symbol, timeframe, since)

    # Check if data was successfully fetched
    if data is not None:
        # Save the raw OHLCV data to Google BigQuery
        save_to_bigquery(data, "crypto_dataset.raw_prices")

        # Add technical indicators to the fetched data
        data = add_technical_indicators(data)

        # Save the processed data (with technical indicators) to Google BigQuery
        save_to_bigquery(data, "crypto_dataset.processed_prices")

        # Prepare the data for model training by splitting into train/test sets and scaling
        X_train, X_test, y_train, y_test, scaler = prepare_training_data(data)

        # Initialize and train a RandomForestClassifier
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)

        # Evaluate the trained model using the testing data
        y_pred = model.predict(X_test)
        print(f"Model Accuracy: {accuracy_score(y_test, y_pred)}")
        print(classification_report(y_test, y_pred))

        # Save the trained model and scaler to Google Cloud Storage
        save_model_to_gcs(
            model, 
            scaler, 
            bucket_name=f'{project_id}-crypto_trading_bucket', 
            model_filename='trading_model.pkl', 
            scaler_filename='scaler.pkl'
        )

    else:
        print("Data fetching failed. Please check your parameters and try again.")


Data successfully saved to BigQuery table: crypto_dataset.raw_prices
Data successfully saved to BigQuery table: crypto_dataset.processed_prices
Model Accuracy: 0.9162128194386259
              precision    recall  f1-score   support

           0       0.92      0.99      0.96     15358
           1       0.31      0.03      0.05      1351

    accuracy                           0.92     16709
   macro avg       0.61      0.51      0.50     16709
weighted avg       0.87      0.92      0.88     16709

Model and scaler successfully saved to GCS bucket: 'streamlit-apps-431010-crypto_trading_bucket'
