# Install needed deps

#### Don't forget to run ```apt-get update --fix-missing && sudo apt-get install build-essential``` and ```apt-get install zlib1g-dev``` in case you are running on an Ubuntu image

In [None]:
%pip install pandas-ta==0.3.14b --pre
%pip install gym==0.21.0
%pip install ipywidgets
%pip install ray
%pip install -U "ray[tune]"
%pip install -U "ray[rllib]"
%pip install -U "ray[serve]"
%pip install ta
%pip install quantstats
%pip install sklearn
%pip install feature_engine
%pip install --upgrade mplfinance
%pip install optuna

# Prepare and fetch the data

In [1]:
from tensortrade.data.cdd import CryptoDataDownload

import numpy as np
import pandas as pd
pd.options.mode.use_inf_as_na = True

def prepare_data(df):
    df['volume'] = np.int64(df['volume'])
    df['date'] = pd.to_datetime(df['date'])
    df.sort_values(by='date', ascending=True, inplace=True)
    df.reset_index(drop=True, inplace=True)
    df['date'] = df['date'].dt.strftime('%Y-%m-%d %I:%M %p')
    return df

def fetch_data():
    cdd = CryptoDataDownload()
    bitfinex_data = cdd.fetch("Bitfinex", "USD", "BTC", "1h")
    bitfinex_data = bitfinex_data[['date', 'open', 'high', 'low', 'close', 'volume']]
    bitfinex_data = prepare_data(bitfinex_data)
    return bitfinex_data

def load_csv(filename):
    df = pd.read_csv('data/' + filename, skiprows=1)
    #df.drop(columns=['symbol', 'volume_btc'], inplace=True)

    # Fix timestamp from "2019-10-17 09-AM" to "2019-10-17 09-00-00 AM"
    df['date'] = df['date'].str[:14] + '00-00 ' + df['date'].str[-2:]

    return prepare_data(df)

In [2]:
data = fetch_data()
data

Unnamed: 0,date,open,high,low,close,volume
0,2018-05-15 06:00 AM,8723.800000,8793.000000,8714.9,8739.000000,8988053
1,2018-05-15 07:00 AM,8739.000000,8754.800000,8719.3,8743.000000,2288904
2,2018-05-15 08:00 AM,8743.000000,8743.100000,8653.2,8723.700000,8891773
3,2018-05-15 09:00 AM,8723.700000,8737.800000,8701.2,8708.100000,2054868
4,2018-05-15 10:00 AM,8708.100000,8855.700000,8695.8,8784.400000,17309722
...,...,...,...,...,...,...
34522,2022-04-24 09:00 PM,39526.977727,39670.000000,39505.0,39669.000000,6064813
34523,2022-04-24 10:00 PM,39669.000000,39798.000000,39566.0,39588.000000,2804385
34524,2022-04-24 11:00 PM,39588.000000,39593.585605,39461.0,39480.581239,2732269
34525,2022-04-25 12:00 AM,39473.000000,39526.000000,38706.0,38890.299273,15814016


## Create features for the feed module

In [31]:
import os
import numpy as np
import ta as ta1
import pandas_ta as ta

import quantstats as qs
qs.extend_pandas()

def fix_dataset_inconsistencies_without_backfilling(dataframe, fill_value=None):
    dataframe = dataframe.replace([-np.inf, np.inf], np.nan)

    return dataframe.fillna(axis='index', method='pad').dropna(axis='columns')

def fix_dataset_inconsistencies(dataframe, fill_value=None):
    dataframe = dataframe.replace([-np.inf, np.inf], np.nan)

    #This is done to avoid filling middle holes with backfilling.
    if fill_value is None:
        dataframe.iloc[0,:] = \
            dataframe.apply(lambda column: column.iloc[column.first_valid_index()], axis='index')
    else:
        dataframe.iloc[0,:] = \
            dataframe.iloc[0,:].fillna(fill_value)

    return dataframe.fillna(axis='index', method='pad').dropna(axis='columns')

def rsi(price: 'pd.Series[pd.Float64Dtype]', period: float) -> 'pd.Series[pd.Float64Dtype]':
    r = price.diff()
    upside = np.minimum(r, 0).abs()
    downside = np.maximum(r, 0).abs()
    rs = upside.ewm(alpha=1 / period).mean() / downside.ewm(alpha=1 / period).mean()
    return 100*(1 - (1 + rs) ** -1)

def macd(price: 'pd.Series[pd.Float64Dtype]', fast: float, slow: float, signal: float) -> 'pd.Series[pd.Float64Dtype]':
    fm = price.ewm(span=fast, adjust=False).mean()
    sm = price.ewm(span=slow, adjust=False).mean()
    md = fm - sm
    signal = md - md.ewm(span=signal, adjust=False).mean()
    return signal

def generate_all_default_quantstats_features(data):
    excluded_indicators = [
        'compare',
        'greeks',
        'information_ratio',
        'omega',
        'r2',
        'r_squared',
        'rolling_greeks',
        'warn',
    ]
    
    indicators_list = [f for f in dir(qs.stats) if f[0] != '_' and f not in excluded_indicators]
    
    df = data.copy()
    df = df.set_index('date')
    df.index = pd.DatetimeIndex(df.index)

    for indicator_name in indicators_list:
        try:
            #print(indicator_name)
            indicator = qs.stats.__dict__[indicator_name](df['close'])
            if isinstance(indicator, pd.Series):
                indicator = indicator.to_frame(name=indicator_name)
                df = pd.concat([df, indicator], axis='columns')
        except (pd.errors.InvalidIndexError, ValueError):
            pass

    df = df.reset_index()
    return df

def generate_features(data):

    # Add day of week as feature
    data['date'] = pd.to_datetime(data["date"])
    data['day_of_week'] = data['date'].dt.dayofweek

    # Add hour of day as feature
    data['hour_of_day'] = data['date'].dt.hour

    # Generate all default indicators from ta library
    ta1.add_all_ta_features(data, 
                            'open', 
                            'high', 
                            'low', 
                            'close', 
                            'volume', 
                            fillna=True)

    # Naming convention across most technical indicator libraries
    data = data.rename(columns={'open': 'Open', 
                                'high': 'High', 
                                'low': 'Low', 
                                'close': 'Close', 
                                'volume': 'Volume'})
    data = data.set_index('date')

    # Custom indicators
    features = pd.DataFrame.from_dict({
        'prev_open': data['Open'].shift(1),
        'prev_high': data['High'].shift(1),
        'prev_low': data['Low'].shift(1),
        'prev_close': data['Close'].shift(1),
        'prev_volume': data['Volume'].shift(1),
        'vol_5': data['Close'].rolling(window=5).std().abs(),
        'vol_10': data['Close'].rolling(window=10).std().abs(),
        'vol_20': data['Close'].rolling(window=20).std().abs(),
        'vol_30': data['Close'].rolling(window=30).std().abs(),
        'vol_50': data['Close'].rolling(window=50).std().abs(),
        'vol_60': data['Close'].rolling(window=60).std().abs(),
        'vol_100': data['Close'].rolling(window=100).std().abs(),
        'vol_200': data['Close'].rolling(window=200).std().abs(),
        'ma_5': data['Close'].rolling(window=5).mean(),
        'ma_10': data['Close'].rolling(window=10).mean(),
        'ma_20': data['Close'].rolling(window=20).mean(),
        'ma_30': data['Close'].rolling(window=30).mean(),
        'ma_50': data['Close'].rolling(window=50).mean(),
        'ma_60': data['Close'].rolling(window=60).mean(),
        'ma_100': data['Close'].rolling(window=100).mean(),
        'ma_200': data['Close'].rolling(window=200).mean(),
        'ema_5': ta1.trend.ema_indicator(data['Close'], window=5, fillna=True),
        'ema_9': ta1.trend.ema_indicator(data['Close'], window=9, fillna=True),
        'ema_21': ta1.trend.ema_indicator(data['Close'], window=21, fillna=True),
        'ema_60': ta1.trend.ema_indicator(data['Close'], window=60, fillna=True),
        'ema_64': ta1.trend.ema_indicator(data['Close'], window=64, fillna=True),
        'ema_120': ta1.trend.ema_indicator(data['Close'], window=120, fillna=True),
        'lr_open': np.log(data['Open']).diff().fillna(0),
        'lr_high': np.log(data['High']).diff().fillna(0),
        'lr_low': np.log(data['Low']).diff().fillna(0),
        'lr_close': np.log(data['Close']).diff().fillna(0),
        'r_volume': data['Close'].diff().fillna(0),
        'rsi_5': rsi(data['Close'], period=5),
        'rsi_10': rsi(data['Close'], period=10),
        'rsi_100': rsi(data['Close'], period=100),
        'rsi_7': rsi(data['Close'], period=7),
        'rsi_28': rsi(data['Close'], period=28),
        'rsi_6': rsi(data['Close'], period=6),
        'rsi_14': rsi(data['Close'], period=14),
        'rsi_26': rsi(data['Close'], period=24),
        'macd_normal': macd(data['Close'], fast=12, slow=26, signal=9),
        'macd_short': macd(data['Close'], fast=10, slow=50, signal=5),
        'macd_long': macd(data['Close'], fast=200, slow=100, signal=50),
        'macd_wolfpack': macd(data['Close'], fast=3, slow=8, signal=9),
    })

    # Concatenate both manually and automatically generated features
    data = pd.concat([data, features], axis='columns').fillna(method='pad')

    # Remove potential column duplicates
    data = data.loc[:,~data.columns.duplicated()]

    # Revert naming convention
    data = data.rename(columns={'Open': 'open', 
                                'High': 'high', 
                                'Low': 'low', 
                                'Close': 'close', 
                                'Volume': 'volume'})


    data = data.reset_index()

    # Generate all default quantstats features
    df_quantstats = generate_all_default_quantstats_features(data)

    # Concatenate both manually and automatically generated features
    data = pd.concat([data, df_quantstats], axis='columns').fillna(method='pad')


    # Remove potential column duplicates
    data = data.loc[:,~data.columns.duplicated()]

    # A lot of indicators generate NaNs at the beginning of DataFrames, so remove them
    data = data.iloc[200:]
    data = data.reset_index(drop=True)

    data = fix_dataset_inconsistencies_without_backfilling(data, fill_value=None)
    return data

In [32]:
data = generate_features(data)
# remove not needed features
to_drop = ['others_dlr', 'compsum']
data = data.drop(columns=to_drop)
data

  dip[idx] = 100 * (self._dip[idx] / value)
  din[idx] = 100 * (self._din[idx] / value)
  self._psar_up = pd.Series(index=self._psar.index)
  self._psar_down = pd.Series(index=self._psar.index)
  return bound(*args, **kwds)
  return _pd.concat(dfs, 1, sort=True)


Unnamed: 0,date,open,high,low,close,volume,volume_adi,volume_obv,volume_cmf,volume_fi,...,macd_short,macd_long,macd_wolfpack,pct_rank,rolling_sharpe,rolling_sortino,rolling_volatility,to_drawdown_series,day_of_week,hour_of_day
0,2018-06-17 14:00:00,6478.500000,6489.700000,6468.0,6471.800000,1022486,-9.250099e+07,-142747620,-0.113508,-1.450109e+07,...,-6.909801,-26.630426,-7.836208,33.333333,-1.011391,-1.409555,0.106466,-0.263262,6,14
1,2018-06-17 15:00:00,6471.800000,6522.700000,6425.1,6519.900000,5793783,-8.703964e+07,-136953837,0.013557,2.738206e+07,...,-1.971003,-27.054936,5.889742,61.666667,-0.848052,-1.188504,0.107041,-0.257787,6,15
2,2018-06-17 16:00:00,6519.900000,6523.000000,6500.0,6500.100000,754856,-8.778793e+07,-137708693,-0.016441,2.133517e+07,...,-1.124469,-27.265745,4.932850,43.333333,-0.888689,-1.244393,0.107106,-0.260041,6,16
3,2018-06-17 17:00:00,6500.100000,6510.600000,6481.0,6492.000000,2148478,-8.833957e+07,-139857171,0.007591,1.580120e+07,...,-1.371957,-27.387553,2.100355,40.000000,-0.886843,-1.241843,0.107104,-0.260963,6,17
4,2018-06-17 18:00:00,6492.000000,6506.000000,6487.6,6503.000000,983358,-8.767687e+07,-138873813,0.048717,1.508916e+07,...,-0.354892,-27.606788,3.480080,50.000000,-0.579998,-0.830296,0.104992,-0.259710,6,18
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
33722,2022-04-24 21:00:00,39526.977727,39670.000000,39505.0,39669.000000,6064813,1.036479e+10,-3688370633,0.263768,1.249967e+07,...,10.198366,58.527379,16.254117,51.666667,-1.339597,-1.636928,0.065078,-0.421743,6,21
33723,2022-04-24 22:00:00,39669.000000,39798.000000,39566.0,39588.000000,2804385,1.036252e+10,-3691175018,0.222457,-2.173674e+07,...,7.009476,57.143399,3.113579,26.666667,-1.322784,-1.617026,0.065042,-0.422924,6,22
33724,2022-04-24 23:00:00,39588.000000,39593.585605,39461.0,39480.581239,2732269,1.036059e+10,-3693907287,0.171248,-6.055963e+07,...,-4.541306,56.723628,-24.269541,10.000000,-1.417697,-1.730603,0.065119,-0.424490,6,23
33725,2022-04-25 00:00:00,39473.000000,39526.000000,38706.0,38890.299273,15814016,1.035189e+10,-3709721303,0.054592,-1.385441e+09,...,-65.489970,61.773491,-157.066417,1.666667,-1.818277,-2.163477,0.068277,-0.433094,0,0


## Remove features with low variance before splitting the dataset

In [33]:
from sklearn.feature_selection import VarianceThreshold
sel = VarianceThreshold(threshold=(.8 * (1 - .8)))
date = data[['date']].copy()
data = data.drop(columns=['date'])
sel.fit(data)
data[data.columns[sel.get_support(indices=True)]]
data = pd.concat([date, data], axis='columns')
data.shape

(33727, 142)

# Setup which data to use for training and which data to use for evaluation of RL Model

In [34]:
from sklearn.model_selection import train_test_split

def split_data(data):
    X = data.copy()
    y = X['close'].pct_change()

    X_train_test, X_valid, y_train_test, y_valid = \
        train_test_split(data, data['close'].pct_change(), train_size=0.67, test_size=0.33, shuffle=False)

    X_train, X_test, y_train, y_test = \
        train_test_split(X_train_test, y_train_test, train_size=0.50, test_size=0.50, shuffle=False)

    return X_train, X_test, X_valid, y_train, y_test, y_valid

In [35]:
X_train, X_test, X_valid, y_train, y_test, y_valid = \
    split_data(data)

In [42]:
def estimate_outliers(data):
    return iqr(data) * 1.5

def estimate_percent_gains(data, column='close'):
    returns = get_returns(data, column=column)
    gains = estimate_outliers(returns)
    return gains

def get_returns(data, column='close'):
    return fix_dataset_inconsistencies(data[[column]].pct_change(), fill_value=0)

def precalculate_ground_truths(data, column='close', threshold=None):
    returns = get_returns(data, column=column)
    gains = estimate_outliers(returns) if threshold is None else threshold
    print(gains)
    binary_gains = (returns[column] > gains).astype(int)
    return binary_gains

def is_null(data):
    return data.isnull().sum().sum() > 0

precalculate_ground_truths(data)

0.00844902464905714


0        0
1        0
2        0
3        0
4        0
        ..
33722    0
33723    0
33724    0
33725    0
33726    0
Name: close, Length: 33727, dtype: int64

## Implement basic feature engineering

In [36]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score

from feature_engine.selection import SelectBySingleFeaturePerformance

from scipy.stats import iqr


def estimate_outliers(data):
    return iqr(data) * 1.5

def estimate_percent_gains(data, column='close'):
    returns = get_returns(data, column=column)
    gains = estimate_outliers(returns)
    return gains

def get_returns(data, column='close'):
    return fix_dataset_inconsistencies(data[[column]].pct_change(), fill_value=0)

def precalculate_ground_truths(data, column='close', threshold=None):
    returns = get_returns(data, column=column)
    gains = estimate_outliers(returns) if threshold is None else threshold
    binary_gains = (returns[column] > gains).astype(int)
    return binary_gains

def is_null(data):
    return data.isnull().sum().sum() > 0



rf = RandomForestClassifier(n_estimators=100, 
                            random_state=1990, 
                            n_jobs=-1)

sel = SelectBySingleFeaturePerformance(variables=None, 
                                       estimator=rf, 
                                       scoring="roc_auc", 
                                       cv=5, 
                                       threshold=0.65)

sel.fit(X_train, precalculate_ground_truths(X_train, column='close'))

SelectBySingleFeaturePerformance(cv=5,
                                 estimator=RandomForestClassifier(n_jobs=-1,
                                                                  random_state=1990),
                                 threshold=0.65)

In [37]:
# import matplotlib.pyplot as plt

feature_performance = pd.Series(sel.feature_performance_).sort_values(ascending=False)
# feature_performance.plot.bar(figsize=(40, 10))
# plt.title('Performance of ML models trained with individual features')
# plt.ylabel('roc-auc')

In [44]:
features_to_drop = sel.features_to_drop_
# Keep some values no matter what
to_drop = list(set(features_to_drop) - set(['open', 'high', 'low', 'close', 'volume', 'day_of_week', 'hour_of_day']))
len(to_drop)
features_to_drop

['open',
 'high',
 'low',
 'close',
 'volume_adi',
 'volume_obv',
 'volume_cmf',
 'volume_fi',
 'volume_sma_em',
 'volume_vwap',
 'volume_mfi',
 'volume_nvi',
 'volatility_bbm',
 'volatility_bbh',
 'volatility_bbl',
 'volatility_bbw',
 'volatility_bbp',
 'volatility_bbhi',
 'volatility_bbli',
 'volatility_kcc',
 'volatility_kch',
 'volatility_kcl',
 'volatility_kcw',
 'volatility_kcp',
 'volatility_kcli',
 'volatility_dcl',
 'volatility_dch',
 'volatility_dcm',
 'volatility_dcw',
 'volatility_dcp',
 'volatility_atr',
 'volatility_ui',
 'trend_macd',
 'trend_macd_signal',
 'trend_macd_diff',
 'trend_sma_fast',
 'trend_sma_slow',
 'trend_ema_fast',
 'trend_ema_slow',
 'trend_vortex_ind_pos',
 'trend_vortex_ind_neg',
 'trend_vortex_ind_diff',
 'trend_trix',
 'trend_mass_index',
 'trend_dpo',
 'trend_kst',
 'trend_kst_sig',
 'trend_kst_diff',
 'trend_ichimoku_conv',
 'trend_ichimoku_base',
 'trend_ichimoku_a',
 'trend_ichimoku_b',
 'trend_stc',
 'trend_adx',
 'trend_adx_pos',
 'trend_adx_n

In [45]:
X_train = X_train.drop(columns=to_drop)
X_test = X_test.drop(columns=to_drop)
X_valid = X_valid.drop(columns=to_drop)

X_train.shape, X_test.shape, X_valid.shape

((11298, 18), (11299, 18), (11130, 18))

In [46]:
X_train.columns.tolist()

['date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'volume_em',
 'volume_vpt',
 'volatility_kchi',
 'trend_aroon_up',
 'momentum_stoch_rsi',
 'others_dr',
 'lr_high',
 'lr_close',
 'r_volume',
 'macd_wolfpack',
 'day_of_week',
 'hour_of_day']

## Normalize the dataset subsets to make the model converge faster

In [47]:
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler

scaler_type = MinMaxScaler

def get_feature_scalers(X, scaler_type=scaler_type):
    scalers = []
    for name in list(X.columns[X.columns != 'date']):
        scalers.append(scaler_type().fit(X[name].values.reshape(-1, 1)))
    return scalers

def get_scaler_transforms(X, scalers):
    X_scaled = []
    for name, scaler in zip(list(X.columns[X.columns != 'date']), scalers):
        X_scaled.append(scaler.transform(X[name].values.reshape(-1, 1)))
    X_scaled = pd.concat([pd.DataFrame(column, columns=[name]) for name, column in \
                          zip(list(X.columns[X.columns != 'date']), X_scaled)], axis='columns')
    return X_scaled

def scale_numpy_array(np_arr, scaler_type = scaler_type):
    return scaler_type().fit_transform(np_arr, (-1,1))

def normalize_data(X_train, X_test, X_valid):
    X_train_test = pd.concat([X_train, X_test], axis='index')
    X_train_test_valid = pd.concat([X_train_test, X_valid], axis='index')

    X_train_test_dates = X_train_test[['date']]
    X_train_test_valid_dates = X_train_test_valid[['date']]

    X_train_test = X_train_test.drop(columns=['date'])
    X_train_test_valid = X_train_test_valid.drop(columns=['date'])

    train_test_scalers = \
        get_feature_scalers(X_train_test, 
                            scaler_type=scaler_type)
    train_test_valid_scalers = \
        get_feature_scalers(X_train_test_valid, 
                            scaler_type=scaler_type)

    X_train_test_scaled = \
        get_scaler_transforms(X_train_test, 
                              train_test_scalers)
    X_train_test_valid_scaled = \
        get_scaler_transforms(X_train_test_valid, 
                              train_test_scalers)
    X_train_test_valid_scaled_leaking = \
        get_scaler_transforms(X_train_test_valid, 
                              train_test_valid_scalers)

    X_train_test_scaled = \
        pd.concat([X_train_test_dates, 
                   X_train_test_scaled], 
                  axis='columns')
    X_train_test_valid_scaled = \
        pd.concat([X_train_test_valid_dates, 
                   X_train_test_valid_scaled], 
                  axis='columns')
    X_train_test_valid_scaled_leaking = \
        pd.concat([X_train_test_valid_dates, 
                   X_train_test_valid_scaled_leaking], 
                  axis='columns')

    X_train_scaled = X_train_test_scaled.iloc[:X_train.shape[0]]
    X_test_scaled = X_train_test_scaled.iloc[X_train.shape[0]:]
    X_valid_scaled = X_train_test_valid_scaled.iloc[X_train_test.shape[0]:]
    X_valid_scaled_leaking = X_train_test_valid_scaled_leaking.iloc[X_train_test.shape[0]:]

    return (train_test_scalers, 
            train_test_valid_scalers, 
            X_train_scaled, 
            X_test_scaled, 
            X_valid_scaled, 
            X_valid_scaled_leaking)

train_test_scalers, train_test_valid_scalers, X_train_scaled, X_test_scaled, X_valid_scaled, X_valid_scaled_leaking = \
    normalize_data(X_train, X_test, X_valid)

In [1]:
import os
cwd = os.getcwd()

train_csv = os.path.join(cwd, 'train.csv')
test_csv = os.path.join(cwd, 'test.csv')
valid_csv = os.path.join(cwd, 'valid.csv')
train_scaled_csv = os.path.join(cwd, 'train_scaled.csv')
test_scaled_csv = os.path.join(cwd, 'test_scaled.csv')
valid_scaled_csv = os.path.join(cwd, 'valid_scaled.csv')
valid_scaled_leaking_csv = os.path.join(cwd, 'valid_scaled_leaking.csv')




### Save to CSV

In [None]:
X_train.to_csv(train_csv, index=False)
X_test.to_csv(test_csv, index=False)
X_valid.to_csv(valid_csv, index=False)
X_train_scaled.to_csv(train_scaled_csv, index=False)
X_test_scaled.to_csv(test_scaled_csv, index=False)
X_valid_scaled.to_csv(valid_scaled_csv, index=False)
X_valid_scaled_leaking.to_csv(valid_scaled_leaking_csv, index=False)

### Load from CSV if data previously saved

In [5]:
import pandas as pd

X_train = pd.read_csv(train_csv)
X_test = pd.read_csv(test_csv)
X_valid = pd.read_csv(valid_csv)
X_train_scaled = pd.read_csv(train_scaled_csv)
X_test_scaled = pd.read_csv(test_scaled_csv)
X_valid_scaled = pd.read_csv(valid_scaled_csv)
X_valid_scaled_leaking = pd.read_csv(valid_scaled_leaking_csv)

In [13]:
X_test_scaled.tail()

Unnamed: 0,date,open,high,low,close,volume,volume_em,volume_vpt,volatility_kchi,trend_aroon_up,momentum_stoch_rsi,others_dr,lr_high,lr_close,r_volume,macd_wolfpack,day_of_week,hour_of_day
11294,2021-01-13 22:00:00,0.890709,0.891715,0.892672,0.888593,0.145863,0.003415,0.563438,1.0,0.958333,0.954959,0.459552,0.514215,0.505819,0.571624,0.674789,0.333333,0.956522
11295,2021-01-13 23:00:00,0.888826,0.886627,0.895879,0.892938,0.080102,0.002807,0.542956,1.0,1.0,1.0,0.477204,0.457169,0.523476,0.638253,0.604959,0.333333,1.0
11296,2021-01-14 00:00:00,0.893221,0.901193,0.905074,0.903533,0.111046,0.003362,0.550737,1.0,1.0,1.0,0.494358,0.530483,0.540526,0.703452,0.583452,0.5,0.0
11297,2021-01-14 01:00:00,0.903426,0.89187,0.905313,0.89516,0.074468,0.002695,0.545638,1.0,0.958333,0.816829,0.442376,0.441574,0.488527,0.505562,0.465085,0.5,0.043478
11298,2021-01-14 02:00:00,0.895105,0.889203,0.897228,0.894087,0.082067,0.002606,0.538968,0.0,0.916667,0.793112,0.462237,0.466245,0.508513,0.581724,0.385389,0.5,0.086957


# Defining the environment

In [14]:
import random
import gym
from gym import spaces
from sklearn import preprocessing
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# infinite number in python
MAX_NET_WORTH = 2147483647
MAX_NUM_QUOTE_OR_BASE_ASSET = 2147483647

INITIAL_QUOTE_ASSET = 10000
INITIAL_BASE_ASSET = 0
OBSERVATION_WINDOW_SIZE = 24 # Probably we should put it as param ?

class SimpleTradingEnv(gym.Env):
    
    metadata = {'render.modes': ['live', 'human', 'none']}
    visualization = None

    def __init__(self, config=None):
        
        self.df_scaled = config.get("df_scaled").reset_index(drop=True)
        self.df_normal = config.get("df_normal").reset_index(drop=True)
        self.window_size = OBSERVATION_WINDOW_SIZE
        self.prices, self.features = self._process_data(self.df_scaled)
        # The shape of the observation is (window_size * features + environment_features) the environment_features are: quote_asset, base_asset, net_worth. The entire observation is flattened in a 1D np array. 
        # NOT USED ANYMORE, KEPT FOR REFERENCE
        # self.obs_shape = ((OBSERVATION_WINDOW_SIZE * self.features.shape[1] + 3),) 

        # The shape of the observation is number of candles to look back, and the number of features (candle_features) + 3 (quote_asset, base_asset, net_worth)
        self.obs_shape = (OBSERVATION_WINDOW_SIZE, self.features.shape[1] + 3)

        # Action space
        #self.action_space = spaces.Box(low=np.array([0, 0]), high=np.array([3.0, 1.0]), dtype=np.float32)
        self.action_space = spaces.MultiDiscrete([3, 100])
        # Observation space
        self.observation_space = spaces.Box(low=-1, high=1, shape=self.obs_shape, dtype=np.float32)

        # Initialize the episode environment

        self._start_candle = OBSERVATION_WINDOW_SIZE # We assume that the first observation is not the first row of the dataframe, in order to avoid the case where there are no calculated indicators.
        self._end_candle = len(self.features) - 1
        self._trading_fee = config.get("trading_fee")

        self._quote_asset = None
        self._base_asset = None
        self._done = None
        self._current_candle = None
        self._net_worth = None
        self._previous_net_worth = None

        # Array that will contain observation history needed for appending it to the observation space
        # It will contain observations consisting of the net_worth, base_asset and quote_asset as list of floats
        # Other features (OHLC + Indicators) will be appended to the current observation in the _get_observation method that takes the data directly from the available dataframe
        self._obs_env_history = None

        # Render and analysis data
        self._total_reward_accumulated = None
        self.trade_history = None
        self._first_rendering = None
        

    def reset(self):
        self._done = False
        self._current_candle = self._start_candle
        self._quote_asset = INITIAL_QUOTE_ASSET
        self._base_asset = INITIAL_BASE_ASSET 
        self._net_worth = INITIAL_QUOTE_ASSET # at the begining our net worth is the initial quote asset
        self._previous_net_worth = INITIAL_QUOTE_ASSET # at the begining our previous net worth is the initial quote asset
        self._total_reward_accumulated = 0.
        self._first_rendering = True
        self.trade_history = []
        self._obs_env_history = []
        
        self._initial_obs_data()

        return self._get_observation()

    def _take_action(self, action):
        self._done = False
        current_price = random.uniform(
            self.df_normal.loc[self._current_candle, "low"], self.df_normal.loc[self._current_candle, "high"])


        action_type = action[0]
        amount = action[1] / 100
        
        if action_type == 0: # Buy
            # Buy % assets
            # Determine the maximum amount of quote asset that can be bought
            available_amount_to_buy_with = self._quote_asset / current_price
            # Buy only the amount that agent chose
            assets_bought = available_amount_to_buy_with * amount
            # Update the quote asset balance
            self._quote_asset -= assets_bought * current_price
            # Update the base asset
            self._base_asset += assets_bought
            # substract trading fee from base asset based on the amount bought
            self._base_asset -= self._trading_fee * assets_bought

            # Add to trade history the amount bought if greater than 0
            if assets_bought > 0:
                self.trade_history.append({'step': self._current_candle, 'type': 'Buy', 'amount': assets_bought, 'price': current_price, 'total' : assets_bought * current_price, 'percent_amount': action[1]})
        

        elif action_type == 1: # Sell
            # Sell % assets
            # Determine the amount of base asset that can be sold
            amount_to_sell = self._base_asset * amount
            received_quote_asset = amount_to_sell * current_price
            # Update the quote asset
            self._quote_asset += received_quote_asset
            # Update the base asset
            self._base_asset -= amount_to_sell
            
            # substract trading fee from quote asset based on the amount sold
            self._quote_asset -= self._trading_fee * received_quote_asset

            # Add to trade history the amount sold if greater than 0
            if amount_to_sell > 0:
                self.trade_history.append({'step': self._current_candle, 'type': 'Sell', 'amount': amount_to_sell, 'price': current_price, 'total' : received_quote_asset, 'percent_amount': action[1]})

        else:
            # Hold
            self.trade_history.append({'step': self._current_candle, 'type': 'Hold', 'amount': '0', 'price': current_price, 'total' : 0, 'percent_amount': action[1]})


        # Update the current net worth
        self._net_worth = self._base_asset * current_price + self._quote_asset


    def step(self, action):
        """
        Returns the next observation, reward, done and info.
        """
        
        self._take_action(action)

        # Calculate reward comparing the current net worth with the previous net worth
        reward = self._net_worth - self._previous_net_worth

        self._total_reward_accumulated += reward

        # Update the previous net worth to be the current net worth after the reward has been applied
        self._previous_net_worth = self._net_worth

        obs = self._get_observation()
        # Update the info and add it to history data
        info = dict (
            total_reward_accumulated = self._total_reward_accumulated,
            net_worth = self._net_worth,
            last_action_type = self.trade_history[-1]['type'] if len(self.trade_history) > 0 else None,
            last_action_amount = self.trade_history[-1]['amount'] if len(self.trade_history) > 0 else None,
            current_step = self._current_candle
        )

        self._current_candle += 1

        # Update observation history
        self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset])

        self._done = self._net_worth <= 0 or self._current_candle >= (len(
            self.df_normal.loc[:, 'open'].values) - 30)# We assume that the last observation is not the last row of the dataframe, in order to avoid the case where there are no calculated indicators.

        if self._done:
            print('I have finished the episode')
        
        return obs, reward, self._done, info


    def _get_observation(self):
        """
        Returns the current observation.
        """
        data_frame = self.features[(self._current_candle - self.window_size):self._current_candle]

        obs_env_history = np.array(self._obs_env_history).astype(np.float32)

        #TODO We definetely need to scale the observation history in a better way, this might influence training results
        # Doing it ad-hoc might change the scale of the min and max, thus changing the results
        obs_env_history = preprocessing.minmax_scale(obs_env_history, (-0.9,0.9)) 

        obs = np.hstack((data_frame, obs_env_history[(self._current_candle - self.window_size):self._current_candle]))

        return obs


    def render(self, mode='human', **kwargs):
        """
        Renders a plot with trades made by the agent.
        """
        
        if mode == 'human':
            print(f'Accumulated Reward: {self._total_reward_accumulated} ---- Current Net Worth: {self._net_worth}')
            print(f'Current Quote asset: {self._quote_asset} ---- Current Base asset: {self._base_asset}')
            print(f'Number of trades: {len(self.trade_history)}')
        
            if(len(self.trade_history) > 0):
                print(f'Last Action: {self.trade_history[-1]["type"]} {self.trade_history[-1]["amount"]} assets ({self.trade_history[-1]["percent_amount"]} %) at price {self.trade_history[-1]["price"]}, total: {self.trade_history[-1]["total"]}')
            print(f'--------------------------------------------------------------------------------------')
        elif mode == 'live':
            pass
            # if self.visualization == None:
            #     self.visualization = LiveTradingGraph(self.df_normal, kwargs.get('title', None))

            # if self._current_candle > OBSERVATION_WINDOW_SIZE:
            #     self.visualization.render(self._current_candle, self._net_worth, self.trade_history, window_size=OBSERVATION_WINDOW_SIZE)

    def close(self):
        if self.visualization != None:
            self.visualization.close()
            self.visualization = None
         

    def _process_data(self, df_scaled):
        """
        Processes the dataframe into features.
        """
        
        prices = self.df_scaled.loc[:, 'close'].to_numpy(dtype=np.float32)

        data_frame = df_scaled.iloc[:, 1:] # drop first column which is date TODO: Should be probably fixed outside of this class
        # Convert df to numpy array
        return prices, data_frame.to_numpy(dtype=np.float32)

    def _initial_obs_data(self):
        for i in range(self.window_size - len(self._obs_env_history)):
            self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset])


### BTC Accumulation env

In [28]:
import random
import gym
from gym import spaces
from sklearn import preprocessing
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# infinite number in python
MAX_NET_WORTH = 2147483647
MAX_NUM_QUOTE_OR_BASE_ASSET = 2147483647

INITIAL_QUOTE_ASSET = 0
INITIAL_BASE_ASSET = 1
OBSERVATION_WINDOW_SIZE = 24 # Probably we should put it as param ?

class BTCAccumulationEnv(gym.Env):
    
    metadata = {'render.modes': ['live', 'human', 'none']}
    visualization = None

    def __init__(self, config=None):
        
        self.df_scaled = config.get("df_scaled").reset_index(drop=True)
        self.df_normal = config.get("df_normal").reset_index(drop=True)
        self.window_size = OBSERVATION_WINDOW_SIZE
        self.prices, self.features = self._process_data(self.df_scaled)
        # The shape of the observation is (window_size * features + environment_features) the environment_features are: quote_asset, base_asset, net_worth. The entire observation is flattened in a 1D np array. 
        # NOT USED ANYMORE, KEPT FOR REFERENCE
        # self.obs_shape = ((OBSERVATION_WINDOW_SIZE * self.features.shape[1] + 3),) 

        # The shape of the observation is number of candles to look back, and the number of features (candle_features) + 3 (quote_asset, base_asset, net_worth)
        self.obs_shape = (OBSERVATION_WINDOW_SIZE, self.features.shape[1] + 3)

        # Action space
        #self.action_space = spaces.Box(low=np.array([0, 0]), high=np.array([3.0, 1.0]), dtype=np.float32)
        self.action_space = spaces.MultiDiscrete([3, 100])
        # Observation space
        self.observation_space = spaces.Box(low=-1, high=1, shape=self.obs_shape, dtype=np.float32)

        # Initialize the episode environment

        self._start_candle = OBSERVATION_WINDOW_SIZE # We assume that the first observation is not the first row of the dataframe, in order to avoid the case where there are no calculated indicators.
        self._end_candle = len(self.features) - 1
        self._trading_fee = config.get("trading_fee")

        self._quote_asset = None
        self._base_asset = None
        self._done = None
        self._current_candle = None
        self._net_worth = None
        self._previous_net_worth = None
        self._previous_base_asset = None
        self._previous_quote_asset = None

        # Array that will contain observation history needed for appending it to the observation space
        # It will contain observations consisting of the net_worth, base_asset and quote_asset as list of floats
        # Other features (OHLC + Indicators) will be appended to the current observation in the _get_observation method that takes the data directly from the available dataframe
        self._obs_env_history = None

        # Render and analysis data
        self._total_reward_accumulated = None
        self.trade_history = None
        self._first_rendering = None
        

    def reset(self):
        self._done = False
        self._current_candle = self._start_candle
        self._quote_asset = INITIAL_QUOTE_ASSET
        self._base_asset = INITIAL_BASE_ASSET 
        self._net_worth = INITIAL_QUOTE_ASSET # at the begining our net worth is the initial quote asset
        self._previous_net_worth = INITIAL_QUOTE_ASSET # at the begining our previous net worth is the initial quote asset
        self._previous_base_asset = INITIAL_BASE_ASSET
        self._previous_quote_asset = INITIAL_QUOTE_ASSET
        self._total_reward_accumulated = 0
        self._first_rendering = True
        self.trade_history = []
        self._obs_env_history = []
        
        self._initial_obs_data()

        return self._get_observation()

    def _take_action(self, action):
        self._done = False
        current_price = random.uniform(
            self.df_normal.loc[self._current_candle, "low"], self.df_normal.loc[self._current_candle, "high"])


        action_type = action[0]
        amount = action[1] / 100
        
        if action_type == 0: # Buy
            # Buy % assets
            # Determine the maximum amount of quote asset that can be bought
            available_amount_to_buy_with = self._quote_asset / current_price
            # Buy only the amount that agent chose
            assets_bought = available_amount_to_buy_with * amount
            # Update the quote asset balance
            self._quote_asset -= assets_bought * current_price
            # Update the base asset
            self._base_asset += assets_bought
            # substract trading fee from base asset based on the amount bought
            self._base_asset -= self._trading_fee * assets_bought

            # Add to trade history the amount bought if greater than 0
            if assets_bought > 0:
                self.trade_history.append({'step': self._current_candle, 'type': 'Buy', 'amount': assets_bought, 'price': current_price, 'total' : assets_bought * current_price, 'percent_amount': action[1]})
        

        elif action_type == 1: # Sell
            # Sell % assets
            # Determine the amount of base asset that can be sold
            amount_to_sell = self._base_asset * amount
            received_quote_asset = amount_to_sell * current_price
            # Update the quote asset
            self._quote_asset += received_quote_asset
            # Update the base asset
            self._base_asset -= amount_to_sell
            
            # substract trading fee from quote asset based on the amount sold
            self._quote_asset -= self._trading_fee * received_quote_asset

            # Add to trade history the amount sold if greater than 0
            if amount_to_sell > 0:
                self.trade_history.append({'step': self._current_candle, 'type': 'Sell', 'amount': amount_to_sell, 'price': current_price, 'total' : received_quote_asset, 'percent_amount': action[1]})

        else:
            # Hold
            self.trade_history.append({'step': self._current_candle, 'type': 'Hold', 'amount': '0', 'price': current_price, 'total' : 0, 'percent_amount': action[1]})


        # Update the current net worth
        self._net_worth = self._base_asset * current_price + self._quote_asset


    def step(self, action):
        """
        Returns the next observation, reward, done and info.
        """
        
        self._take_action(action)

        # Calculate reward comparing the current base asset with the previous base asset
        reward = self._base_asset - self._previous_base_asset

        self._total_reward_accumulated += reward

        # Update the previous net worth to be the current net worth after the reward has been applied
        self._previous_net_worth = self._net_worth
        self._previous_base_asset = self._base_asset
        self._previous_quote_asset = self._quote_asset

        obs = self._get_observation()
        # Update the info and add it to history data
        info = dict (
            total_reward_accumulated = self._total_reward_accumulated,
            net_worth = self._net_worth,
            last_action_type = self.trade_history[-1]['type'] if len(self.trade_history) > 0 else None,
            last_action_amount = self.trade_history[-1]['amount'] if len(self.trade_history) > 0 else None,
            current_step = self._current_candle
        )

        self._current_candle += 1

        # Update observation history
        self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset])

        self._done = self._net_worth <= 0 or self._current_candle >= (len(
            self.df_normal.loc[:, 'open'].values) - 30)# We assume that the last observation is not the last row of the dataframe, in order to avoid the case where there are no calculated indicators.

        if self._done:
            print('The episode has finished')
        
        return obs, reward, self._done, info


    def _get_observation(self):
        """
        Returns the current observation.
        """
        data_frame = self.features[(self._current_candle - self.window_size):self._current_candle]

        obs_env_history = np.array(self._obs_env_history).astype(np.float32)

        #TODO We definetely need to scale the observation history in a better way, this might influence training results
        # Doing it ad-hoc might change the scale of the min and max, thus changing the results
        obs_env_history = preprocessing.minmax_scale(obs_env_history, (-0.9,0.9)) 

        obs = np.hstack((data_frame, obs_env_history[(self._current_candle - self.window_size):self._current_candle]))

        return obs


    def render(self, mode='human', **kwargs):
        """
        Renders a plot with trades made by the agent.
        """
        
        if mode == 'human':
            print(f'Accumulated Reward: {self._total_reward_accumulated} ---- Current Net Worth: {self._net_worth}')
            print(f'Current Quote asset: {self._quote_asset} ---- Current Base asset: {self._base_asset}')
            print(f'Number of trades: {len(self.trade_history)}')
        
            if(len(self.trade_history) > 0):
                print(f'Last Action: {self.trade_history[-1]["type"]} {self.trade_history[-1]["amount"]} assets ({self.trade_history[-1]["percent_amount"]} %) at price {self.trade_history[-1]["price"]}, total: {self.trade_history[-1]["total"]}')
            print(f'--------------------------------------------------------------------------------------')
        elif mode == 'live':
            pass
            # if self.visualization == None:
            #     self.visualization = LiveTradingGraph(self.df_normal, kwargs.get('title', None))

            # if self._current_candle > OBSERVATION_WINDOW_SIZE:
            #     self.visualization.render(self._current_candle, self._net_worth, self.trade_history, window_size=OBSERVATION_WINDOW_SIZE)

    def close(self):
        if self.visualization != None:
            self.visualization.close()
            self.visualization = None
         

    def _process_data(self, df_scaled):
        """
        Processes the dataframe into features.
        """
        
        prices = self.df_scaled.loc[:, 'close'].to_numpy(dtype=np.float32)

        data_frame = df_scaled.iloc[:, 1:] # drop first column which is date TODO: Should be probably fixed outside of this class
        # Convert df to numpy array
        return prices, data_frame.to_numpy(dtype=np.float32)

    def _initial_obs_data(self):
        for i in range(self.window_size - len(self._obs_env_history)):
            self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset])


### Create a vectorized ENV as wrapper for parallelization

### Init ray and trainer config

### How to setup a cluster fast:

1. You need to decide which PC/server will be the head node and just run ``` ray start --head --port 6379 ```, this will be the master node

2. On the worker nodes you need to make sure that
a. You have the same ray and python version
b. You have installed all needed pip deps:
```pip install ray torch torchvision tabulate tensorboard tensorflow sklearn```
c. You have network conectivity to the head node (```telnet head-node-ip 6379```)
3. On all the machines that you want them to act as worker nodes you run :
```ray start --address="192.168.0.206:6379" --node-ip-address="192.168.0.150" --num-cpus=4```
a. address is the head node ip
b. node ip address is the ip of the worker node, pay attention to use an external one not localhost or 127.0.0.1
c. num cpus - how much cpu power you want to alocate
4. Also you need to make sure that your worker nodes are reacheable from the head node, because the communication is from both sides

That's all :) Enjoy


Very important notes about parallelism :

1. ```num_worker``` is per trial ! so if you increase this you will eat up resources and won't be able to scale to multiple parallel trials when doing hyperparam tuning, see below
2. a trial uses 2 CPU resources to run (1 for the driver ```num_cpus_for_driver``` -> and 1 for the worker (```num_cpus_per_worker```)). 
3. a trial has only 1 driver, while it can have multiple workers

In [15]:
import math

def find_optimal_resource_allocation(available_cpu, available_gpu):
    """
    Finds the optimal resource allocation for the agent based on the available resources in the cluster
    """
    # If we have GPU available, we allocate it all for the training, while creating as much workers as CPU cores we have minus one for the driver which holds the trainer
    if available_gpu > 0:
        return {
            'num_workers': available_cpu - 1,
            'num_cpus_per_worker': 1,
            'num_envs_per_worker': 1,
            'num_gpus_per_worker': 0,
            'num_cpus_for_driver': 1,
            'num_gpus' : available_gpu
        }
    # If we don't have GPU available, we allocate as much CPU cores for the training as possible, while creating only one worker for stepping the environment
    else:
        # according to the benchmark, we should allocate more workers, each with 1 cpu, letting the rest for the driver
        num_workers = int(math.floor((available_cpu  * 75) / 100))
        num_cpu_for_driver = available_cpu - num_workers
        return {
            'num_workers': num_workers,
            'num_cpus_per_worker': 1, # this should be enough for stepping an env at once
            'num_envs_per_worker': 1, # it doesn't seem to add any benefits to have more than one env per worker
            'num_gpus_per_worker': 0, # the inference is done pretty fast, so there is no need to use GPU, at least not when we run one trial at once
            'num_cpus_for_driver': num_cpu_for_driver,
            'num_gpus' : 0
        }

In [29]:
import os
import time
import ray
import os
from ray import tune
from ray.rllib.env.vector_env import VectorEnv
from ray.tune.registry import register_env



# Get the current working directory and create a folder to store the results
# cwd = os.getcwd()
# local_dir = "~/ray_results/"
# if not os.path.exists(local_dir):
#     os.makedirs(local_dir)



# Let's define some tuning parameters
FC_SIZE = tune.grid_search([[256, 256], [1024], [128, 64, 32]])  
LEARNING_RATE = tune.grid_search([0.001, 0.0005, 0.00001])
MINIBATCH_SIZE = tune.grid_search([5, 10, 20])  
GAMMA = tune.grid_search([0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999])


# Initialize Ray
ray.shutdown() # let's shutdown first any running instances of ray (don't confuse it with the cluster)
os.environ['RAY_record_ref_creation_sites'] = '1' # Needed for debugging when things go wrong
ray.init() 

available_gpu_in_cluster = ray.available_resources()['GPU'] if ray.available_resources()['GPU']  else 0
available_cpu_in_cluster = ray.available_resources()['CPU'] if ray.available_resources()['CPU']  else 0

# In the first version we assume that we have only one node cluster, so the allocation logic is based on that
# So the resources are maximized for one ray tune trial at a time
parallel_config = find_optimal_resource_allocation(available_cpu_in_cluster, 0) # Currently we are going to disable GPU ussage due to it's poor performance on a single instance cluster

trading_fee = 0.0075
training_config = {
            "trading_fee": trading_fee,
            "df_normal": X_train,
            "df_scaled": X_train_scaled,
}

eval_config = {
            "trading_fee": trading_fee,
            "df_normal": X_test,
            "df_scaled": X_test_scaled,
}

training_env = SimpleTradingEnv(training_config)
eval_env = SimpleTradingEnv(eval_config)

training_env_key = "SimpleTradingEnv-training-V01"
eval_env_key = "SimpleTradingEnv-evaluating-V01"

# tune.register_env("SimpleTradingEnv-training-V01", lambda _: training_env)
# tune.register_env("SimpleTradingEnv-evaluating-V01", lambda _: eval_env)

# training_env = BTCAccumulationEnv(training_config)
# eval_env = BTCAccumulationEnv(eval_config)

# training_env_key = "BTCAccumulationEnv-training-V01"
# eval_env_key = "BTCAccumulationEnv-evaluating-V01"

tune.register_env(training_env_key, lambda _: training_env)
tune.register_env(eval_env_key, lambda _: eval_env)


# Create the ppo trainer configuration
ppo_trainer_config = {
        "env": training_env_key, # Ray will automatically create multiple environments and vectorize them if needed
        "horizon": len(X_train_scaled) - 30,
        "log_level": "WARN",
        "framework": "tf",
        #"eager_tracing": True,
        "ignore_worker_failures": True, 
        "num_workers": parallel_config.get("num_workers"), # Number of workers is per trial run, so the more we put the less parallelism we have
        "num_envs_per_worker": parallel_config.get("num_envs_per_worker"), # This influences also the length of the episode. the environment length will be split by the number of environments per worker
        "num_gpus": parallel_config.get("num_gpus"), # Number of GPUs to use in training (0 means CPU only). After a few experiments, it seems that using GPU is not helping
        "num_cpus_per_worker": parallel_config.get("num_cpus_per_worker"), # After some testing, seems the fastest way for this kind of enviroment. It's better to run more trials in parallel than to finish a trial with a couple of minutes faster. Because we can end trial earlier if we see that our model eventuall converge
        "num_cpus_for_driver": parallel_config.get("num_cpus_for_driver"), # Number of CPUs to use for the driver. This is the number of CPUs used for the training process.
        "num_gpus_per_worker": parallel_config.get("num_gpus_per_worker"), 
        "rollout_fragment_length": 200, # Size of batches collected from each worker. If num_envs_per_worker is > 1 the rollout value will be multiplied by num_envs_per_worker
        "train_batch_size": 2048, # Number of timesteps collected for each SGD round. This defines the size of each SGD epoch. the batch size is composed of fragments defined above
        "sgd_minibatch_size": 64,
        "batch_mode": "complete_episodes",
        "vf_clip_param": 100, # Default is 10, but we increase it to 100 to adapt it to our rewards scale. It helps our value function to converge faster
        "lr": 0.00001,  # Hyperparameter grid search defined above
        "gamma": 0.95,  # This can have a big impact on the result and needs to be properly tuned
        #"observation_filter": "MeanStdFilter",
        "model": {
        #    "fcnet_hiddens": FC_SIZE,  # Hyperparameter grid search defined above
            # "use_lstm": True,
            # "lstm_cell_size": 256,
            # "lstm_use_prev_action_reward": True,
            # "lstm_use_prev_action": True,
            
        },
        #"sgd_minibatch_size": MINIBATCH_SIZE,  # Hyperparameter grid search defined above
        "evaluation_interval": 5,  # Run one evaluation step on every x `Trainer.train()` call.
        "evaluation_duration": 1,  # How many episodes to run evaluations for each time we evaluate.
        "evaluation_config": {
            "explore": True,  # We usually don't want to explore during evaluation. All actions have to be repeatable. Similar to deterministic = True, but on-policy algorithms can get better results with exploration.
            "env": eval_env_key, # We need to define a new environment for evaluation with different parameters
        },
        "logger_config": {
            "logdir": "/tmp/ray_logging/",
            "type": "ray.tune.logger.UnifiedLogger",
        }
    }




2022-05-04 23:29:34,345	INFO services.py:1456 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


### Print some info about all nodes in cluster, just to make sure we really are in a cluster

In [14]:
import time


print ('Total CPU in cluster: ', ray.cluster_resources()['CPU'])
print ('Available CPU in cluster: ', ray.available_resources()['CPU'])
print('Total GPU in cluster: ', ray.cluster_resources()['GPU'])
print ('Available GPU in cluster: ', ray.available_resources()['GPU'])
print('Existing nodes in cluster: ', ray.nodes())
print('RAY_record_ref_creation_sites: ', os.environ.get('RAY_record_ref_creation_sites'))

# This ensures that remote cluster is working
@ray.remote
def f():
    time.sleep(0.01)
    return ray._private.services.get_node_ip_address()

# Get a list of the IP addresses of the nodes that have joined the cluster.
set(ray.get([f.remote() for _ in range(1000)]))

Total CPU in cluster:  8.0
Available CPU in cluster:  8.0
Total GPU in cluster:  1.0
Available GPU in cluster:  1.0
Existing nodes in cluster:  [{'NodeID': 'd873d3d5a4907305e9e598a1b6c818486c68cbcb2f17cdc3fee55563', 'Alive': True, 'NodeManagerAddress': '172.20.231.182', 'NodeManagerHostname': 'DESKTOP-TRKQ95T', 'NodeManagerPort': 37435, 'ObjectManagerPort': 34377, 'ObjectStoreSocketName': '/tmp/ray/session_2022-05-03_22-22-17_246996_329/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2022-05-03_22-22-17_246996_329/sockets/raylet', 'MetricsExportPort': 56014, 'alive': True, 'Resources': {'CPU': 8.0, 'GPU': 1.0, 'object_store_memory': 3733778841.0, 'memory': 7467557684.0, 'node:172.20.231.182': 1.0, 'accelerator_type:G': 1.0}}]
RAY_record_ref_creation_sites:  1


{'172.20.231.182'}

### Run ray tune 

In [31]:
from ray.tune import JupyterNotebookReporter

jupyter_reporter = JupyterNotebookReporter(True, max_report_frequency=30)

analysis = tune.run(
    run_or_experiment="PPO",  
    name="Bitcoin_Acc_Local_PPO_Trial_V01",
    metric='episode_reward_mean',
    mode='max',
    stop={
        # An iteration is equal with one SGD round which in our case is equal to (number of workers multiplied by the environment length).
        "training_iteration": 1000, 
        # "timesteps_total": 0, # kept as reference
        "episode_reward_mean": 25000 # buy and hold value for X_valid dataset, if get here we should stop and train only the generation that beats it, moreover we can hypertune that one particulary.
        
    },
    config=ppo_trainer_config,
    num_samples=1,  # Have one sample for each hyperparameter combination. You can have more to average out randomness.
    keep_checkpoints_num=50,  # Keep the last X checkpoints
    checkpoint_freq=5,  # Checkpoint every X iterations (save the model)
    local_dir="/tmp/ray_results/",  # Local directory to store checkpoints and results, we are using tmp folder until we move the notebook to a docker instance and we can use the same directory across all instances, no matter the underlying OS
    progress_reporter=jupyter_reporter,
    #restore="/tmp/ray_results/SimpleTrading_Env_Distributed_PPO_Trial_V08/PPO_SimpleTradingEnv-training-V01_687da_00000_0_2022-04-29_17-26-50/checkpoint_000711",
    fail_fast="raise",
    resume=False
)

Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_BTCAccumulationEnv-training-V01_fce7a_00000,RUNNING,192.168.209.127:17014,41,6684.85,2766024,-0.0914311,0.0023284,-0.525214,11244


In [25]:
best_trial = analysis.get_best_trial(metric="episode_reward_mean", mode="max", scope="all") 
best_checkpoint = analysis.get_best_checkpoint(best_trial, metric="episode_reward_mean")

# manual_checkpoint = analysis.get_last_checkpoint()
# manual_trial = analysis.trials[manual_checkpoint.trial_id]

print("Best checkpoint:", best_checkpoint)
print("Best trial:", best_trial)
print("Trial result", best_trial.last_result["episode_reward_mean"])
# print("Manual checkpoint:", manual_checkpoint)
# print("Manual trial:", manual_trial)


Best checkpoint: /tmp/ray_results/SimpleTrading_Env_Distributed_PPO_Trial_V12/PPO_SimpleTradingEnv-training-V01_b1a77_00000_0_2022-05-03_17-31-17/checkpoint_000450/checkpoint-450
Best trial: PPO_SimpleTradingEnv-training-V01_b1a77_00000
Trial result 9579.796897700096


### Evaluate trained model restoring it from checkpoint

In [None]:
from ray.rllib.agents import ppo
from tabulate import tabulate
import json


agent = ppo.PPOTrainer(config=best_trial.config)
agent.restore(best_checkpoint)

results_table = []
json_dict_list = []


exploration_types = [True] # True for stochastic, False for deterministic
envs = [eval_env, training_env]

for iter, env in enumerate(envs):
    for exploration_type in exploration_types:
        episodes_to_run = 1 if exploration_type else 1 # 5 episodes for stochastic exploration to average out randomness, 1 for deterministic
        
        for i in range(episodes_to_run):
            episode_reward = 0
            done = False
            obs = eval_env.reset() # we are using the evaluation environment for evaluation
            net_worths = []
            while not done:
                action = agent.compute_single_action(obs, explore=exploration_type) # stochastic evaluation, it's not deterministic and seems to be the best for PPO training
                obs, reward, done, info = eval_env.step(action)

                net_worths.append(info['net_worth']) # Add all historical net worths to a list to print some statistics at the end of the episode
                episode_reward += reward

            results_table.append([i, "evaluation" if iter == 0 else "training" ,exploration_type, episode_reward, (episode_reward/len(net_worths)),net_worths[-1], np.mean(net_worths), 
            np.std(net_worths), np.max(net_worths), np.min(net_worths)])

            tba = {"environments" : 
                {
                    "name": "evaluation" if iter == 0 else "training",
                    "explorationType": "stochastic" if exploration_type else "deterministic",
                    "episodes": [
                        {
                            "episode": i,
                            "reward": episode_reward,
                            "accumulatedEpisodeReward": episode_reward,
                            "episodeRewardMean": episode_reward/len(net_worths),
                            "netWorthEndOfEp": net_worths[-1],
                            "netWorthMean": np.mean(net_worths),
                            "netWorthStd": np.std(net_worths),
                            "netWorthMax": np.max(net_worths),
                            "netWorthMin": np.min(net_worths)
                        }
                    ]
            }}

            json_dict_list.append(tba)



headers=[
    "Episode",
    "Env",
    "Stochastic",
    "Acc. ep. reward end of the eval ep",
    "Avg. ep. reward", 
    "Net worth end of the eval ep.",
    "Avg. net worth over the eval ep.",
    "Std of net worth over the eval. ep.",
    "Max net worth",
    "Min net worth",

]


print(tabulate(results_table, headers))

# Write the results to JSON file
with open("results.json", "w") as f:
    json.dump(json_dict_list, f)

In [None]:
import ray
ray.shutdown()

In [1]:
import tensorflow as tf
tf.config.list_physical_devices()

2022-04-28 18:36:56.561690: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-28 18:36:56.627003: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-28 18:36:56.627569: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]