In [1]:
!pip install pandas_ta pygad

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import gym
import numpy as np
import pandas as pd
import pandas_ta as ta
import pygad
import pygad.kerasga

from gym import spaces
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

In [3]:
try:
    from urllib.request import urlretrieve
except ImportError:
    from urllib import urlretrieve

# Download data
print(f'Downloading OIH_adjusted.txt...')
urlretrieve('http://api.kibot.com/?action=history&symbol=OIH&interval=1&unadjusted=0&bp=1&user=guest', 'OIH_adjusted.txt')

# Read data and assign names to the columns
df = pd.read_csv('OIH_adjusted.txt')
df.columns = ['date','time','open','high','low','close','volume']

# Combine date and time in the date column
df['date'] = df['date'] + ' ' + df['time']
df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y %H:%M')
df = df[['date','open','high','low','close','volume']]

# Sort by date and assign the date as index
df = df.sort_values('date').reset_index(drop=True).set_index('date')

# Convert the data to different timeframes & save them for future uses
AGGREGATION = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
TIMEFRAMES = ['1H']

for timeframe in TIMEFRAMES:
    print(f'Converting & Saving {timeframe} Data...')
    df = df.resample(timeframe).agg(AGGREGATION).dropna()
    df.to_csv(f'OIH_{timeframe}.csv.gz', compression='gzip')


  and should_run_async(code)


Downloading OIH_adjusted.txt...
Converting & Saving 1H Data...


In [4]:
# Operations
SELL = 0
HOLD = 1
BUY = 2

class SellHoldBuyEnv(gym.Env):
        
    def __init__(self, observation_size, features, closes):

        # Data
        self.__features = features
        self.__prices = closes

        # Spaces
        self.observation_space = spaces.Box(low=np.NINF, high=np.PINF, shape=(observation_size,), dtype=np.float32)
        self.action_space = spaces.Discrete(3)

        # Episode Management
        self.__start_tick = observation_size
        self.__end_tick = len(self.__prices)
        self.__current_tick = self.__end_tick

        # Position Management
        self.__current_action = HOLD
        self.__current_profit = 0
        self.__wins = 0
        self.__losses = 0
        
    def reset(self):

        # Reset the current action and current profit
        self.__current_action = HOLD
        self.__current_profit = 0
        self.__wins = 0
        self.__losses = 0
        
        # Reset the current tick pointer and return a new observation
        self.__current_tick = self.__start_tick
        
        return self.__get_observation()

    def step(self, action):

        # If current tick is over the last index in the feature array, the environment needs to be reset
        if self.__current_tick > self.__end_tick:
            raise Exception('The environment needs to be reset.')

        # Compute the step reward (Penalize the agent if it is stuck doing anything)
        step_reward = 0
        if self.__current_action == HOLD and action == BUY:
            self.__open_price = self.__prices[self.__current_tick]
            self.__current_action = BUY
        elif self.__current_action == BUY and action == SELL:            
            step_reward = self.__prices[self.__current_tick] - self.__open_price
            self.__current_profit += step_reward
            self.__current_action = HOLD
            
            if step_reward > 0:
                self.__wins += 1
            else:
                self.__losses += 1

        # Generate the custom info array with the real and predicted values
        info = {
            'current_action': self.__current_action,
            'current_profit': self.__current_profit,
            'wins': self.__wins,
            'losses': self.__losses
        }

        # Increase the current tick pointer, check if the environment is fully processed, and get a new observation
        self.__current_tick += 1
        done = self.__current_tick >= self.__end_tick
        obs = self.__get_observation()

        # Returns the observation, the step reward, the status of the environment, and the custom information
        return obs, step_reward, done, info

    def __get_observation(self):

        # If current tick over the last value in the feature array, the environment needs to be reset
        if self.__current_tick >= self.__end_tick:
            return None

        # Generate a copy of the observation to avoid changing the original data
        obs = self.__features[(self.__current_tick - self.__start_tick):self.__current_tick]

        # Return the calculated observation
        return obs


  and should_run_async(code)


In [5]:
# Constants
OBS_SIZE = 32
FEATURES = 2
SOLUTIONS = 20
GENERATIONS = 50

In [6]:
# Loading data, and split in train and test datasets
df = pd.read_csv('OIH_1H.csv.gz', compression='gzip')
df.ta.bbands(close=df['close'], length=20, append=True)
df = df.dropna()
pd.options.mode.chained_assignment = None
df['high_limit'] = df['BBU_20_2.0'] + (df['BBU_20_2.0'] - df['BBL_20_2.0']) / 2
df['low_limit'] = df['BBL_20_2.0'] - (df['BBU_20_2.0'] - df['BBL_20_2.0']) / 2
df['close_percentage'] = np.clip((df['close'] - df['low_limit']) / (df['high_limit'] - df['low_limit']), 0, 1)
df['volatility'] = df['BBU_20_2.0'] / df['BBL_20_2.0'] - 1
train = df[df['date'] < '2022-01-01']
test = df[df['date'] >= '2022-01-01']

In [7]:
def predict(X, W):
    X      = X.reshape((X.shape[0],-1))           #Flatten
    X      = X @ W[0] + W[1]                      #Dense
    X[X<0] = 0                                    #Relu
    X      = X @ W[2] + W[3]                      #Dense
    X[X<0] = 0                                    #Relu
    X      = X @ W[4] + W[5]                      #Dense
    X      = np.exp(X)/np.exp(X).sum(1)[...,None] #Softmax
    return X

  and should_run_async(code)


In [8]:
# Define fitness function to be used by the PyGAD instance
def fitness_func(self, solution, sol_idx):
    
    global model, observation_space_size, env
    
    # Set the weights to the model
    model_weights_matrix = pygad.kerasga.model_weights_as_matrix(model=model, weights_vector=solution)
    model.set_weights(weights=model_weights_matrix)

    # Run a prediction over the train data
    observation = env.reset()
    total_reward = 0

    done = False    
    while not done:
        state = np.reshape(observation, [1, observation_space_size])
        #q_values = model.predict(state, verbose=0)
        q_values = predict(state, model_weights_matrix)
        action = np.argmax(q_values[0])
        observation, reward, done, info = env.step(action)
        total_reward += reward
    
    # Print the reward and profit
    print(f"Solution {sol_idx:3d} - Total Reward: {total_reward:10.2f} - Profit: {info['current_profit']:10.3f}")

    if sol_idx == (SOLUTIONS-1):
        print("".center(60, "*"))
        
    # Return the solution reward
    return total_reward

In [9]:
# Create a train environmant
env = SellHoldBuyEnv(observation_size=OBS_SIZE, features=train[['close_percentage','volatility']].values, closes=train['close'].values)
observation_space_size = env.observation_space.shape[0] * FEATURES
action_space_size = env.action_space.n

# Create Model
model = Sequential()
model.add(Dense(16, input_shape=(observation_space_size,), activation='relu'))
model.add(Dense(16, activation='relu'))
model.add(Dense(action_space_size, activation='linear'))
model.summary()

# Create Genetic Algorithm
keras_ga = pygad.kerasga.KerasGA(model=model, num_solutions=SOLUTIONS)

ga_instance = pygad.GA(num_generations=GENERATIONS,
                       num_parents_mating=5,
                       initial_population=keras_ga.population_weights,
                       fitness_func=fitness_func,
                       parent_selection_type="sss",
                       crossover_type="single_point",
                       mutation_type="random",
                       mutation_percent_genes=10,
                       keep_parents=-1)

# Run the Genetic Algorithm
ga_instance.run()

# Show details of the best solution.
solution, solution_fitness, solution_idx = ga_instance.best_solution()
print(f"Fitness value of the best solution = {solution_fitness}")
print(f"Index of the best solution : {solution_idx}")

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 16)                1040      
                                                                 
 dense_1 (Dense)             (None, 16)                272       
                                                                 
 dense_2 (Dense)             (None, 3)                 51        
                                                                 
Total params: 1,363
Trainable params: 1,363
Non-trainable params: 0
_________________________________________________________________
Solution   0 - Total Reward:    -351.91 - Profit:   -351.910
Solution   1 - Total Reward:    -349.81 - Profit:   -349.810
Solution   2 - Total Reward:    -348.39 - Profit:   -348.390
Solution   3 - Total Reward:       0.00 - Profit:      0.000
Solution   4 - Total Reward:      16.41 - Profit:     16.406
Solution   5 

In [10]:
# Create a test environmant
env = SellHoldBuyEnv(observation_size=OBS_SIZE, features=test[['close_percentage','volatility']].values, closes=test['close'].values)

# Set the weights of the best solution to the model
best_weights_matrix = pygad.kerasga.model_weights_as_matrix(model=model, weights_vector=solution)
model.set_weights(weights=best_weights_matrix)

# Run a prediction over the test data
observation = env.reset()
total_reward = 0

done = False    
while not done:
    state = np.reshape(observation, [1, observation_space_size])
    #q_values = model.predict(state, verbose=0)
    q_values = predict(state, best_weights_matrix)
    action = np.argmax(q_values[0])
    observation, reward, done, info = env.step(action)
    total_reward += reward

In [11]:
# Show the final result
print(' RESULT '.center(60, '*'))
print(f"* Profit/Loss: {info['current_profit']:6.3f}")
print(f"* Wins: {info['wins']} - Losses: {info['losses']}")
print(f"* Win Rate: {100 * (info['wins']/(info['wins'] + info['losses'])):6.2f}%")

************************** RESULT **************************
* Profit/Loss: 27.920
* Wins: 105 - Losses: 82
* Win Rate:  56.15%


  and should_run_async(code)
