In [1]:
import time
import os
import warnings
import pandas as pd
import pandas_ta as ta
import joblib
import ccxt
from google.cloud import bigquery, pubsub_v1
from google.oauth2 import service_account
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report
import json

warnings.filterwarnings('ignore')

In [2]:
# Load GCP credentials and initialize BigQuery and Pub/Sub clients
credentials_path = 'gentle-studio-430913-r4-e2c69efea4c8.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
credentials = service_account.Credentials.from_service_account_file(credentials_path)

project_id = 'gentle-studio-430913-r4'
bq_client = bigquery.Client(credentials=credentials, project=project_id)
publisher = pubsub_v1.PublisherClient()
pubsub_topic = f'projects/{project_id}/topics/trading_notifications'

In [3]:
def get_market_data(exchange_name, pair, time_interval, start_time):
    try:
        market = getattr(ccxt, exchange_name)()
        market.load_markets()
        market_data = []
        while start_time < time.time() * 1000:
            ohlcv = market.fetch_ohlcv(pair, timeframe=time_interval, since=start_time, limit=1000)
            if len(ohlcv) == 0:
                break
            market_data.extend(ohlcv)
            start_time = ohlcv[-1][0] + 2
            time.sleep(market.rateLimit / 1000)
        df = pd.DataFrame(market_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)
        return df
    except Exception as err:
        print(f"Error getting data: {err}")
        return None

In [4]:
def apply_indicators(data):
    data['SMA_10'] = ta.sma(data['close'], length=10)
    data['SMA_20'] = ta.sma(data['close'], length=20)
    data['EMA_10'] = ta.ema(data['close'], length=10)
    macd_values = ta.macd(data['close'], fast=12, slow=26, signal=9)
    data['MACD'] = macd_values['MACD_12_26_9']
    data['MACD_signal'] = macd_values['MACDs_12_26_9']
    data['RSI'] = ta.rsi(data['close'], length=14)
    data['ATR'] = ta.atr(data['high'], data['low'], data['close'], length=14)
    data['volatility'] = data['close'].rolling(window=10).std()
    data.dropna(inplace=True)
    return data

In [5]:
def prep_model_data(df):
    df['target'] = (df['close'].shift(-1) > df['close']).astype(int)
    features = ['SMA_10', 'SMA_20', 'EMA_10', 'MACD', 'MACD_signal', 'RSI', 'ATR', 'volatility']
    X = df[features]
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    return X_train, X_test, y_train, y_test, scaler

def save_model(model, scaler, model_file='gb_model.pkl', scaler_file='scaler.pkl'):
    joblib.dump(model, model_file)
    joblib.dump(scaler, scaler_file)
    print(f"Model and scaler saved as '{model_file}' and '{scaler_file}'")

In [6]:
def store_data_to_bigquery(data, table_id):
    try:
        data.to_gbq(destination_table=table_id, project_id=project_id, if_exists='replace', credentials=credentials)
        print(f"Data successfully saved to BigQuery table: {table_id}")
    except Exception as err:
        print(f"Error saving data to BigQuery: {err}")

In [7]:
def publish_notification(message):
    try:
        message_json = json.dumps(message)
        message_bytes = message_json.encode('utf-8')
        publisher.publish(pubsub_topic, data=message_bytes)
        print(f"Notification sent: {message}")
    except Exception as err:
        print(f"Error publishing notification: {err}")

In [8]:
market_name = "kucoin"
asset_pair = "BTC/USDT"
interval = "15m"
start_date = int(time.mktime(time.strptime('2020-01-01', '%Y-%m-%d'))) * 1000

data = get_market_data(market_name, asset_pair, interval, start_date)

In [10]:
if data is not None:
    # Store market data in BigQuery
    store_data_to_bigquery(data, 'bitcoin_prices.prices')

    data_with_features = apply_indicators(data)
    X_train, X_test, y_train, y_test, scaler = prep_model_data(data_with_features)

    classifier = GradientBoostingClassifier(n_estimators=300, learning_rate=0.1, max_depth=5, random_state=42)
    classifier.fit(X_train, y_train)

    y_pred = classifier.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Model Accuracy: {accuracy}")
    print(classification_report(y_test, y_pred))

    save_model(classifier, scaler)

    # Publish notification to Pub/Sub
    message = {
        'model_accuracy': accuracy,
        'message': 'Model training completed and saved',
        'timestamp': time.time()
    }
    publish_notification(message)
else:
    print("Failed to fetch market data. Please verify your parameters.")

Data successfully saved to BigQuery table: bitcoin_prices.prices
Model Accuracy: 0.5323618809225819
              precision    recall  f1-score   support

           0       0.53      0.51      0.52     16736
           1       0.53      0.56      0.54     16822

    accuracy                           0.53     33558
   macro avg       0.53      0.53      0.53     33558
weighted avg       0.53      0.53      0.53     33558

Model and scaler saved as 'gb_model.pkl' and 'scaler.pkl'
Notification sent: {'model_accuracy': 0.5323618809225819, 'message': 'Model training completed and saved', 'timestamp': 1728876631.7697833}
