# Stock investing Example

In [None]:
#%load_ext autoreload
#%autoreload 2
import numpy as np
import copy 
import random
import pandas as pd
import datetime as dt
from tqdm import tqdm
from scipy.stats import binom
import matplotlib.dates as mdates
from scipy.optimize import minimize
import matplotlib.pyplot as plt
import yfinance as yf
import itertools
from finite_q_learning import *
from q_learning import *

rng = np.random.default_rng()

Importation of market data from yahoo finance

In [None]:
data = yf.download("GOOGL", start="2010-01-02", end="2024-01-01")
data = data.reset_index()
data["Date"] =  data['Date'].dt.normalize()
data.set_index('Date', inplace=True)
data = data.reset_index()

Computation of the return of the stock selected

In [None]:
ind_pos   = ((np.diff(data["Close"])/(data["Close"])[:-1]) >= 0)
ind_neg   = ((np.diff(data["Close"])/(data["Close"])[:-1]) <0)

returns = np.zeros(len(data)-1)
returns[ind_pos]   =  1
returns[ind_neg]   = -1

Definition of training and testing periods

In [None]:
training_start1 = 800
training_end1   = 1_100
returns_train_1  = returns[training_start1:training_end1]

training_start2 = 2_100
training_end2   = 2_300
returns_train_2  = returns[training_start2:training_end2]

training_start3 = 2_700
training_end3   = 3_000
returns_train_3  = returns[training_start3:training_end3]

returns_train_all = returns[:3000]

test_period_start1 = 3_001
test_period_end1 = 3_300
returns_test_1 = returns[test_period_start1:test_period_end1]

test_period_start2 = 3_301
test_period_end2 = 3_500
returns_test_2= returns[test_period_start2:test_period_end2]

Verification that the training periods correspond to really different periods

## Illustration of Training and Test Periods

Overall

In [None]:
plt.plot(range(test_period_end2),returns[:test_period_end2].cumsum())
plt.plot(range(training_start1,training_end1),returns.cumsum()[training_start1:training_end1],color = "red",label = "Training Period 1")
plt.plot(range(training_start2,training_end2),returns.cumsum()[training_start2:training_end2],color = "green",label = "Training Period 2")
plt.plot(range(training_start3,training_end3),returns.cumsum()[training_start3:training_end3],color = "orange",label = "Training Period 3")
plt.plot(range(test_period_start1,test_period_end1),returns.cumsum()[test_period_start1:test_period_end1],color = "gray",label = "Test Period 1")
plt.plot(range(test_period_start2,test_period_end2),returns.cumsum()[test_period_start2:test_period_end2],color = "black",label = "Test Period 2")

plt.ylabel("Cumulative Sum of Signs of the Returns")
# xtick_numbers = [0,test_period_start1]
# plt.xticks(xtick_numbers, data["Date"].iloc[xtick_numbers])

# Customize x-axis to show only the day and year
date_format = mdates.DateFormatter('%d-%Y')
plt.gca().xaxis.set_major_formatter(date_format)

# Set the locations of the ticks on x-axis
xtick_numbers = [0,training_start1,training_end1,
                 training_start2,training_end2,
                 training_start3,
                 test_period_start1,test_period_end1,test_period_end2]
plt.xticks(xtick_numbers,data["Date"].dt.date[xtick_numbers], rotation=45,ha = "right")

plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('portfolio_train_test.pdf', format='pdf')
plt.show()

### Training Period 1

In [None]:
plt.plot(returns_train_1.cumsum())

### Training Period 2

In [None]:
plt.plot(returns_train_2.cumsum())

### Training Period 3

In [None]:
plt.plot(returns_train_3.cumsum())

In [None]:
h    = 5 # Length of history
ret1 = returns_train_1
ret2 = returns_train_2
ret3 = returns_train_3
ret_all = returns_train_all
#h2   = 4
#returns

Definition of the parameters to use the algorithm

In [None]:
# State Space
T = [-1, 1]
X = np.array(list(itertools.product(T, repeat=h)))
#X
#X2 = np.array(list(itertools.product(T, repeat=h2)))

In [None]:
A = np.array([-1, 0,1]) # Actions

def r(x,a,y):
    return a * y[-1]

eps_greedy = 0.1   # Epsilon greedy policy
alpha      = 0.95  # Discount Factor

#x_0        = 0     # Initial Value
rng  = np.random.default_rng()
x1_0 = rng.choice(np.array([ret1[i:(h+i)]  for i in range(len(ret1)-h-1)]),axis = 0)
x2_0 = rng.choice(np.array([ret2[i:(h+i)]  for i in range(len(ret2)-h-1)]),axis = 0)
x3_0 = rng.choice(np.array([ret3[i:(h+i)]  for i in range(len(ret3)-h-1)]),axis = 0)
x_all_0 = rng.choice(np.array([ret_all[i:(h+i)]  for i in range(len(ret_all)-h-1)]),axis = 0)

k_0        = 0     # Initial index of the corresponding MDP, starting with the central proba of 1/2

In [None]:
# Build the functions that allow us to get the index of an element a (reps. x) in A (resp. X)
if np.ndim(A) > 1:
    A_list = A
else:
    A_list = np.array([[a] for a in A])
if np.ndim(X) > 1:
    X_list = X
else:
    X_list = np.array([[x] for x in X])

def a_index(a):
    return np.flatnonzero((a==A_list).all(1))[0]
def x_index(x):
    return np.flatnonzero((x==X_list).all(1))[0]

For each x, computation of the probabilities of occurence of the next return

In [None]:
p1_list = []
for x in X:
    eps = 1e-8
    x_u  = np.concatenate([x[1:], [1]])
    x_d  = np.concatenate([x[1:], [-1]])
    p_u_raw  = np.sum([np.all(ret1[i:(h+i)]==x_u) for i in range(len(ret1)-h-1)])
    p_d_raw  = np.sum([np.all(ret1[i:(h+i)]==x_d) for i in range(len(ret1)-h-1)])
    p_u  = (eps/5 + p_u_raw) / (p_u_raw + p_d_raw   + eps)
    p_d  = (eps/5 + p_d_raw) / (p_u_raw + p_d_raw  + eps)
    p1_list.append([p_d, p_u])

p2_list = []
for x in X:
    eps = 1e-8
    x_u  = np.concatenate([x[1:], [1]])
    x_d  = np.concatenate([x[1:], [-1]])
    p_u_raw  = np.sum([np.all(ret2[i:(h+i)]==x_u) for i in range(len(ret2)-h-1)])
    p_d_raw  = np.sum([np.all(ret2[i:(h+i)]==x_d) for i in range(len(ret2)-h-1)])
    p_u  = (eps/5 + p_u_raw) / (p_u_raw + p_d_raw   + eps)
    p_d  = (eps/5 + p_d_raw) / (p_u_raw + p_d_raw  + eps)
    p2_list.append([p_d, p_u])

p3_list = []
for x in X:
    eps = 1e-8
    x_u  = np.concatenate([x[1:], [1]])
    x_d  = np.concatenate([x[1:], [-1]])
    p_u_raw  = np.sum([np.all(ret3[i:(h+i)]==x_u) for i in range(len(ret3)-h-1)])
    p_d_raw  = np.sum([np.all(ret3[i:(h+i)]==x_d) for i in range(len(ret3)-h-1)])
    p_u  = (eps/5 + p_u_raw) / (p_u_raw + p_d_raw   + eps)
    p_d  = (eps/5 + p_d_raw) / (p_u_raw + p_d_raw  + eps)
    p3_list.append([p_d, p_u])

p_all_list = []
for x in X:
    eps = 1e-8
    x_u  = np.concatenate([x[1:], [1]])
    x_d  = np.concatenate([x[1:], [-1]])
    p_u_raw  = np.sum([np.all(ret_all[i:(h+i)]==x_u) for i in range(len(ret_all)-h-1)])
    p_d_raw  = np.sum([np.all(ret_all[i:(h+i)]==x_d) for i in range(len(ret_all)-h-1)])
    p_u  = (eps/5 + p_u_raw) / (p_u_raw + p_d_raw   + eps)
    p_d  = (eps/5 + p_d_raw) / (p_u_raw + p_d_raw  + eps)
    p_all_list.append([p_d, p_u])

Construction of the worst case probability

In [None]:
def p1(x, a, y):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p1_list[ind]
    return p_d * (y== -1 ) + p_u* (y== 1 ) 

def P1(x,a):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p1_list[ind]
    rand_unif = (np.random.random_sample(size=1))
    rand = - 1*int(rand_unif < p_d) +1*int(rand_unif >= (p_d))
    return np.concatenate([x[1:],[rand]])

def p2(x, a, y):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p2_list[ind]
    return p_d * (y== -1 ) + p_u* (y== 1 ) 
def P2(x,a):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p2_list[ind]
    rand_unif = (np.random.random_sample(size=1))
    rand = - 1*int(rand_unif < p_d) +1*int(rand_unif >= (p_d))
    return np.concatenate([x[1:],[rand]])


def p3(x, a, y):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p3_list[ind]
    return p_d * (y== -1 ) + p_u* (y== 1 ) 
def P3(x,a):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p3_list[ind]
    rand_unif = (np.random.random_sample(size=1))
    rand = - 1*int(rand_unif < p_d) +1*int(rand_unif >= (p_d))
    return np.concatenate([x[1:],[rand]])

def p_all(x, a, y):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p_all_list[ind]
    return p_d * (y== -1 ) + p_u* (y== 1 ) 
    
def P_all(x,a):
    ind = np.flatnonzero((x==X).all(1))[0]
    p_d, p_u = p_all_list[ind]
    rand_unif = (np.random.random_sample(size=1))
    rand = - 1*int(rand_unif < p_d) +1*int(rand_unif >= (p_d))
    return np.concatenate([x[1:],[rand]])

Define a function to evaluate the optimal action

In [None]:
def a_opt(t, Q_opt):
    return A[np.argmax(Q_opt[x_index(t),:])]

Compute the optimal Q-Value Functions

In [None]:
Nr_iter = 100000

Q_opt_robust_1, V_robust = finite_q_learning(X, A, r, np.array([P_all,P1, P2, P3]), np.array([p_all,p1, p2, p3]), alpha, x1_0, k_0, eps_greedy, Nr_iter, gamma_t_tilde = lambda t: 1/(t+1), Q_0 = np.ones([len(X),len(A)]))
Q_opt_1, V_1= q_learning(X, A, r, P1, alpha, x1_0, eps_greedy, Nr_iter, gamma_t_tilde = lambda t: 1/(t+1), Q_0 = np.ones([len(X),len(A)]))
Q_opt_2, V_2 = q_learning(X, A, r, P2, alpha, x2_0, eps_greedy, Nr_iter, gamma_t_tilde = lambda t: 1/(t+1), Q_0 = np.ones([len(X),len(A)]))
Q_opt_3, V_3 = q_learning(X, A, r, P3, alpha, x3_0, eps_greedy, Nr_iter, gamma_t_tilde = lambda t: 1/(t+1), Q_0 = np.ones([len(X),len(A)]))
Q_opt_all, V_all = q_learning(X, A, r, P_all, alpha, x_all_0, eps_greedy, Nr_iter, gamma_t_tilde = lambda t: 1/(t+1), Q_0 = np.ones([len(X),len(A)]))

# Save the results in csv-files
df_rob = pd.DataFrame(np.array([[a_opt(x, Q_opt_robust_1) for x in X]]))
df_1 = pd.DataFrame(np.array([[a_opt(x, Q_opt_1) for x in X]]))
df_2 = pd.DataFrame(np.array([[a_opt(x, Q_opt_2) for x in X]]))
df_3 = pd.DataFrame(np.array([[a_opt(x, Q_opt_3) for x in X]]))
df_all = pd.DataFrame(np.array([[a_opt(x, Q_opt_all) for x in X]])) 

df_rob.to_csv('csv/Q_rob.csv')
df_1.to_csv('csv/Q_1.csv')
df_2.to_csv('csv/Q_2.csv')
df_3.to_csv('csv/Q_3.csv')
df_all.to_csv('csv/Q_all.csv')

# Evaluation

In [None]:
def evaluate_strategies(returns):
    X_t = np.array([returns[i:(h+i)]  for i in range(len(returns)-h-1)])
    print("Days:             {}\nNegative Returns:   {}\nPositive Returns:   {}\n".format(len(returns),
                                                                                np.sum(returns==-1),                                            
                                                                                np.sum(returns==1)))
    robust_rewards       = np.array([r(X_t[i],a_opt(X_t[i], Q_opt_robust_1),X_t[i+1]) for i in range(len(X_t)-1)])
    rewards_1       = np.array([r(X_t[i],a_opt(X_t[i], Q_opt_1),X_t[i+1]) for i in range(len(X_t)-1)])
    rewards_2       = np.array([r(X_t[i],a_opt(X_t[i], Q_opt_2),X_t[i+1]) for i in range(len(X_t)-1)])
    rewards_3       = np.array([r(X_t[i],a_opt(X_t[i], Q_opt_3),X_t[i+1]) for i in range(len(X_t)-1)])
    rewards_all      = np.array([r(X_t[i],a_opt(X_t[i], Q_opt_all),X_t[i+1]) for i in range(len(X_t)-1)])
    
    trend_following        = np.array([((X_t[i][-1]-X_t[i][0] > 0))  * X_t[i+1][-1] for i in range(len(X_t)-1)])
    buy_and_hold           = np.array([ X_t[i+1][-1] for i in range(len(X_t)-1)])
    
    print(
        "- Average Profit per Trade -\n"
        "Robust:          {:0.4f}\n"
        "Non-Robust 1:    {:0.4f}\n"
        "Non-Robust 2:    {:0.4f}\n"
        "Non-Robust 3:    {:0.4f}\n"
        "All Data:        {:0.4f}\n"
        "Trend following: {:0.4f}\n"
        "Buy and hold:    {:0.4f}".format(
            np.mean(robust_rewards),
            np.mean(rewards_1),
            np.mean(rewards_2),
            np.mean(rewards_3),
            np.mean(rewards_all),
            np.mean(trend_following),
            np.mean(buy_and_hold)
        )
    )                                                                                                                                                                                  

### First Training Period

In [None]:
evaluate_strategies(returns_train_1)

### Second Training Period

In [None]:
evaluate_strategies(returns_train_2)

### Third Training Period

In [None]:
evaluate_strategies(returns_train_3)

# Test Period

### Test Period 1

In [None]:
evaluate_strategies(returns_test_1)

### Test Period 2

In [None]:
evaluate_strategies(returns_test_2)