API REQUESTS AND JSON READ FUNCTIONS
-Inputs = Coins, Timeframe

In [1]:
# Imports
import matplotlib
import matplotlib.pyplot as plt
import dask.dataframe as dd
import My_API_Wraps
import My_FE_Wraps
import My_FS_Wraps
from dask.distributed import Client, LocalCluster
import xgboost
from sklearn.metrics import r2_score
import pandas as pd
import datetime as dt
import numpy as np
from xgboost import dask as dxgb
import os
import cupy as cp
import optuna


In [2]:
# CPU optimized cluster for my CPU 
cluster = LocalCluster(
            n_workers=4,
            threads_per_worker=5,
            processes=True,
            # memory_limit='12GB',
            dashboard_address=':8787',
            resources = {'GPU':1}
        )

In [None]:
# Work in progress-GPU optimized cluster with CUDA
# Notes:Windows doesn't support dask.distributed.LocalCudaCluster
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
cp.cuda.set_allocator(cp.cuda.MemoryPool().malloc)

cluster = LocalCluster(
    n_workers=1,              # Single worker for GPU
    processes=False,          # Thread-based
    threads_per_worker=20,    # Maximum threads for i7-13700H
    memory_limit='22GB',      # 75% of system RAM
    dashboard_address=':8788',
    resources={'GPU': 1}
)

In [None]:
# Key function parameters and setting up dask client
# Notes: Use client for real time analytics to optimise number of partitions, cluster, and dataflow
# If timeframe =1, periods are 5minute intervals, else hourly
client = Client(cluster)
periods= 1
timeframe= 7
top_coins= 5
api_key= "CG-r57ENE22zzPUiLmjnyFK7YHw"
headers = {
    "accept": "application/json",
    "x-cg-demo-api-key": api_key
    }
client

In [1]:
# Use to close client and cluster when done with daskcomputations
client.close()
cluster.close()

NameError: name 'client' is not defined

In [None]:
# Pandas Example 1- Multiprocessing Distributor
df_pandas= My_API_Wraps.CoinGecko_HSPD_Pandas(timeframe, top_coins, periods, api_key)
EF_pandas= My_FE_Wraps.EF_Pandas_MultiprocessingDistributor(df_pandas, ParameterComplexity=0)
SF_pandas= My_FS_Wraps.SF_Pandas_Simple(EF_pandas)


In [None]:
# Pandas Example 2- Pandas with Dask Distributor
df_pandas= My_API_Wraps.CoinGecko_HSPD_Pandas(timeframe, top_coins, periods, api_key)
EF_pandas2= My_FE_Wraps.EF_Pandas_DaskDistributor(df_pandas, cluster.scheduler_address, ParameterComplexity=0)
SF_pandas= My_FS_Wraps.SF_Pandas_Simple(EF_pandas2)

In [None]:
# Dask Example- Data Collection, Feature Extraction & Selection
# Notes: can also use .persist on functions rather than clients
# (Recommended) Keep ef splits to 1 as the next step feature selection occurs on per-partition basis. 
# (Not Recommended) Otherwise call X.columns.tolist() for features selected over multiple partitions 
raw_data= client.submit(My_API_Wraps.CoinGecko_HSPD_Dask, timeframe=timeframe, top_coins=top_coins, periods=periods, api_key=api_key, splits=timeframe*3)
ef= client.submit(My_FE_Wraps.EF_Dask, raw_data, ParameterComplexity=1, splits=1, LR=True)
y = client.persist(ef.result()['y_future'].dropna().repartition(npartitions=10))
sf= client.submit(My_FS_Wraps.SF_Dask_v1, ef, p_value=0.05)
X= client.persist(sf.result().repartition(npartitions=10))
X.compute()

In [None]:
# Dask Example- Test-Train Split by Partitions and DMatrix construction
X_train= X.partitions[0:7].persist()
y_train= y.partitions[0:7].persist()
X_test= X.partitions[7:9].persist()
y_test= y.partitions[7:9].persist()
dtrain = dxgb.DaskDMatrix(client, X_train, y_train)

In [None]:
# Dask Example- Hyperparmater optimization with Dask and Optuna
def objective(trial):
    params = {
        "verbosity": 0,
        "tree_method": "hist",
        "eval_metric": 'mae',
        "lambda": trial.suggest_float("lambda", 1e-8, 100.0, log=True),
        "alpha": trial.suggest_float("alpha", 1e-8, 100.0, log=True),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
        "max_depth": trial.suggest_int("max_depth", 2, 10),
        "min_child_weight": trial.suggest_float("min_child_weight", 1e-8, 100, log=True),
        "learning_rate": trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True)
    }
    try:
        output = dxgb.train(
            client,
            params,
            dtrain,
            num_boost_round=20,
            evals=[(dtrain, "train")]
        )
        return output["history"]["train"]["mae"][-1]
    except Exception as e:
        print(f"Error in training: {str(e)}")
        return float('inf')
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=40, n_jobs=-1, show_progress_bar=True)


In [None]:
# Dask Example- Dask optuna final model results
final_model= dxgb.train(client, study.best_params, dtrain, num_boost_round=300, evals=[(dtrain, "train")])
dtest = dd.from_pandas(X_test.compute())
predictions = dxgb.predict(client, final_model, dtest)
r2 = r2_score(y_test.compute(), predictions)
std = y_test.std().compute()
score = study.best_value
Thresh_var = score/std 
print('Standard_Dev: '+ f'{std}')
print(f"Best parameters: {study.best_params}")
print(f"Best RMSE: {study.best_value}")
print(f"R2 Score: {r2}")
print(f'score/std: {Thresh_var}')
viz= pd.DataFrame(columns=["Predicted", "Actual"], index=y_test.index.compute())
viz['Predicted'] = predictions
viz['Actual'] = y_test.compute().values
viz.plot(figsize=(20,10))

In [None]:
# Pandas- GPU Accelerated XGBoost Using Random Gridsearch for Hyperparameter Tuning 
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
model_parameters={
'num_parallel_tree': [1,3,5,10,20],
'learning_rate': [0.01,0.01,0.05,0.1,0.5,1],
'max_depth': [3,6,12,24,48],
'gamma':[0,0.01,0.05,0.1,0.5,1,5,25],
'min_child_weight':[0.5,1,3,5],
'subsample':[1,0.05, 0.1,0.25,0.5],
'sampling_method':['uniform', 'gradient_based'],
'colsample_bytree':[1, 0.1, 0.5, 1],
'grow_policy':['depthwise', 'lossguide']
}
basemodel= xgb.XGBRegressor(eval_metric='mae', early_stopping_rounds=25, 
                        device= 'cuda',tree_method='hist', n_jobs=1, verbose=0)
rgs= RandomizedSearchCV(estimator=basemodel, verbose= 25, n_jobs=1,
                        cv=tscv,param_distributions=model_parameters, n_iter=20, refit=True).fit(X_train, y_train, eval_set=[X_test, y_test])

In [None]:
# Pandas- Final Model Results
chosen_model= xgb.XGBRegressor(eval_metric='mae', early_stopping_rounds= 25, device= 'cuda',tree_method='hist',**rgs.best_params_, n_jobs=1)
chosen_model.fit(X_train_cp, y_train_cp, eval_set= [(X_test_cp, y_test_cp)])
preds=chosen_model.predict(X_test)
estimated_performance = r2_score(y_test, preds)
predictions = pd.DataFrame(preds, columns= ['predicted_testtrain'], index= X_test.index.shift(freq=f'{periods}min'))
predictions['realised']= y_test.values
predictions= predictions* 100
score = chosen_model.best_score*100
std = features['y_future'].std()*100
print('Mean_absolute error: '+ f'{score}')
print('Standard_Dev: '+ f'{std}')
print(f'R2: {estimated_performance}')
feature_imp = pd.DataFrame(data = chosen_model.feature_importances_, index= chosen_model.feature_names_in_, columns=['f_imp']).sort_values(ascending=False, by='f_imp')
Thresh_var = score/std 
print(f'score/std: {Thresh_var}')
display(feature_imp)

In [None]:
# # Pandas- Final model for deployment and plot of predicted, realised, and untested future predictions
final_model = xgb.XGBRegressor(device= 'cuda',tree_method='hist',**rgs.best_params_)
final_model.fit(X, y)
deployment_preds= features.drop('y_future', axis=1)[X.columns].tail(shift)
dep_preds=final_model.predict(deployment_preds)
dep_preds= pd.Series(dep_preds, index=deployment_preds.index.shift(freq=freq), name='untested_predictions')
predictions.join(dep_preds, how='outer').plot()

In [None]:
# Work in Progress-  Using PancakeSwap API to set up trading bot 
# import requests
# import json

# # Replace with your PancakeSwap API endpoint and API key
API_ENDPOINT = 'https://api.pancakeswap.info/api/v2/orders'
API_KEY = 'your_api_key_here'

def place_order(order_type, amount, leverage, price=None):
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {API_KEY}'
    }
    order_data = {
        'type': order_type,  # 'buy' or 'sell'
        'amount': amount,    # Amount of ETH to trade
        'leverage': leverage # Leverage to use
    }
    if price:
        order_data['price'] = price  # Set price for limit orders
    else:
        order_data['price'] = 'market'  # Set price to market for market orders

    response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(order_data))
    if response.status_code == 200:
        print(f"Order placed successfully: {response.json()}")
    else:
        print(f"Failed to place order: {response.status_code}, {response.text}")

# Example usage
place_order('buy', 0.1, 10)  # Place a market buy order for 0.1 ETH with 10x leverage
place_order('sell', 0.1, 10, 3600.10)  # Place a limit sell order for 0.1 ETH at $3600.10 with 10x leverage
# In this example, if you don't provide a price, the order will be placed at the market price. If you provide a price, it will be treated as a limit order. This way, you can easily switch between market and limit orders based on your trading strategy. Happy trading!


def execute_trade(prediction): 
    if prediction >= 0.02: 
        place_order('buy', 0.1, 10)  # Place a buy order for 0.1 ETH at $3600.10 with 10x leverage
    elif prediction <= -0.02:
        place_order('sell', 0.1, 10)

# # Main trading loop
# while True:
#     market_data = fetch_market_data()
#     prediction = make_prediction(market_data)
#     execute_trade(prediction)
#     # Sleep for 30 minutes before making the next prediction
#     time.sleep(1800)

