In [None]:
import numpy as np
import pandas as pd
import datetime
from matplotlib import pyplot as plt
import yfinance as yf

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import tensorflow as tf
import tensorflow_probability as tfp

In [None]:
name = 'AAPL'

train_end_date = '2023-07-01'

name_ohlc_df = yf.download(name, start='2010-01-01', end=train_end_date)
name_ohlc_df = name_ohlc_df.reset_index()

data = {'date': name_ohlc_df.Date, 'close': name_ohlc_df.Close, 'volume': name_ohlc_df.Volume}
df = pd.DataFrame(data)
df.head()

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(df['date'], df['close'], linestyle='-', color='b')
plt.xlabel('Date')
plt.ylabel('Temperature')
plt.title(name+' Price')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# # x1 = np.array(df.iloc[:,1]+np.random.randn(df.shape[0])).astype(np.float32).reshape(-1,1)
# train_size = int(0.75 * len(df))

# # x1 = np.array(df.iloc[:train_size,1]).astype(np.float32).reshape(-1,1)
# sequence_length = 10
# train_input = np.lib.stride_tricks.sliding_window_view(df.iloc[:train_size,1:].values, window_shape=(sequence_length, 2)).squeeze()
# val_input = np.lib.stride_tricks.sliding_window_view(df.iloc[train_size:-1,1:].values, window_shape=(sequence_length, 2)).squeeze()

# y1 = np.array(df.iloc[sequence_length:train_size+1,1]).T.astype(np.float32).reshape(-1,1)
# y2 = np.array(df.iloc[sequence_length+train_size:,1]).T.astype(np.float32).reshape(-1,1)

# tfd = tfp.distributions

In [None]:
train_size = int(0.75 * len(df))

# x1 = np.array(df.iloc[:train_size,1]).astype(np.float32).reshape(-1,1)
sequence_length = 10
train_input = np.lib.stride_tricks.sliding_window_view(df.iloc[:train_size,1].values, sequence_length)
val_input = np.lib.stride_tricks.sliding_window_view(df.iloc[train_size:-1,1].values, sequence_length)

y1 = np.array(df.iloc[sequence_length:train_size+1,1]).T.astype(np.float32).reshape(-1,1)
y2 = np.array(df.iloc[sequence_length+train_size:,1]).T.astype(np.float32).reshape(-1,1)

tfd = tfp.distributions

In [None]:
# print(train_input[:2])
# print()
# print(y1[:2])
# print(train_input.shape,y1.shape)
print(val_input[:2])
print()
print(y2[:2])
print(val_input.shape,y2.shape)

In [None]:
# # Specify the surrogate posterior over `keras.layers.Dense` `kernel` and `bias`.
# def posterior_mean_field(kernel_size, bias_size=0, dtype=None):
#   n = kernel_size + bias_size
#   c = np.log(np.expm1(1.))
#   return tf.keras.Sequential([
#       tfp.layers.VariableLayer(2 * n, dtype=dtype),
#       tfp.layers.DistributionLambda(lambda t: tfd.Independent(
#           tfd.Normal(loc=t[..., :n],
#                      scale=1e-5 + tf.nn.softplus(c + t[..., n:])),
#           reinterpreted_batch_ndims=1)),
#   ])

# # Specify the prior over `keras.layers.Dense` `kernel` and `bias`.
# def prior_trainable(kernel_size, bias_size=0, dtype=None):
#   n = kernel_size + bias_size
#   return tf.keras.Sequential([
#       tfp.layers.VariableLayer(n, dtype=dtype),
#       tfp.layers.DistributionLambda(lambda t: tfd.Independent(
#           tfd.Normal(loc=t, scale=1),
#           reinterpreted_batch_ndims=1)),
#   ])

In [None]:
deep_model = tf.keras.Sequential([
  tf.keras.layers.LSTM(32,activation='relu',input_shape=(10,1)),
  tf.keras.layers.Dense(64,activation='relu'),
  tf.keras.layers.Dense(24,activation='relu'),
  tf.keras.layers.Dense(8,activation='relu'),
  tf.keras.layers.Dense(1+1),
  tfp.layers.DistributionLambda(
      lambda t: tfd.Normal(loc=t[..., :1],scale=1e-3 + tf.math.softplus(0.1 * t[...,1:]))),
])


In [None]:
# def build_lstm_model_with_distribution(input_shape):
#     inputs = tf.keras.layers.Input(shape=input_shape)
#     # lstm_output1 = tf.keras.layers.LSTM(128, return_sequences=True)(inputs)
#     # lstm_output2 = tf.keras.layers.LSTM(128, return_sequences=True)(dropout_output1)
#     # dropout_output2 = tf.keras.layers.Dropout(0.3)(lstm_output2)
#     # lstm_output3 = tf.keras.layers.LSTM(128, return_sequences=False)(inputs)
#     flatten_inputs = tf.keras.layers.Flatten()(inputs)

#     full_connect1 = tf.keras.layers.Dense(32, activation='relu')(flatten_inputs)
#     flat_mean_output = tf.keras.layers.Dense(2, activation='relu')(full_connect1)

#     # Create a distribution layer using TensorFlow Probability
#     distribution_layer =   tfp.layers.DistributionLambda(
#       lambda t: tfd.Normal(loc=t[..., :1],scale=1e-3 + tf.math.softplus(0.001 * t[...,1:]))
#       )
    
#     distribution_output = distribution_layer(flat_mean_output)
#     model = tf.keras.models.Model(inputs=inputs, outputs=distribution_output)
    
#     return model

In [None]:
# deep_model = build_lstm_model_with_distribution(input_shape=(sequence_length,2))

negloglik = lambda y, rv_y: -rv_y.log_prob(y)
deep_model.compile(optimizer=tf.keras.optimizers.legacy.Adam(), loss=negloglik)

In [None]:
deep_model.fit(train_input, y1, epochs=750, verbose=1)

In [None]:
# Profit.
yhat = deep_model(val_input)
# [print(np.squeeze(w.numpy())) for w in model.weights];
assert isinstance(yhat, tfd.Distribution)

In [None]:
yhats = [deep_model(val_input) for _ in range(100)]
for i, yhat in enumerate(yhats):
  m = np.squeeze(yhat.mean())
  s = np.squeeze(yhat.stddev())

In [None]:
val_dates = np.array(df.iloc[sequence_length+train_size:,0]).T.reshape(-1,1)
print(val_dates.shape,y2.shape,m.shape)
upper_bound = m + s
lower_bound = m - s

plt.figure(figsize=(10, 6))
plt.plot(val_dates,y2, label='True Values', color='black')
plt.plot(val_dates,m, label='Predicted Mean', color='red')
plt.fill_between(val_dates.reshape(-1), lower_bound, upper_bound, color='orange', alpha=0.3, label='Uncertainty Interval')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.title(f'{name} Predictions with Uncertainties')
plt.legend()
plt.grid(True)
plt.tight_layout()
# plt.xlim((val_dates[50][0],val_dates[350][0]))
plt.show()

In [None]:
test_size = -80
test_input = np.lib.stride_tricks.sliding_window_view(df.iloc[test_size:-1,1].values, sequence_length)
y3 = np.array(df.iloc[sequence_length+test_size:,1]).T.astype(np.float32).reshape(-1,1)
y3shift = np.roll(y3, shift=1)
y3shift[0] = np.nan
test_dates = np.array(df.iloc[sequence_length+test_size:,0]).T.reshape(-1,1)

yhats_test = [deep_model(test_input) for _ in range(100)]

for i, yhat in enumerate(yhats_test):
  m_test = np.squeeze(yhat.mean())
  s_test = np.squeeze(yhat.stddev())

print(test_input.shape,y3.shape,test_dates.shape,m_test.shape)
upper_bound = m_test + s_test
lower_bound = m_test - s_test

plt.figure(figsize=(10,5))
plt.plot(test_dates, y3, label='True Values', color='black',marker='o',markersize=3)
plt.plot(test_dates, y3shift, label='Shift Values', color='grey',marker='o',ms=3)
plt.plot(test_dates, m_test, label='Predicted Mean', color='red',marker='x',markersize=3)
plt.fill_between(test_dates.reshape(-1), lower_bound, upper_bound, color='orange', alpha=0.3, label='Uncertainty Interval')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.title(f'{name} Predictions with Uncertainties')
plt.legend(loc='best')
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def calculate_mape(true_data, forecast_data):
    absolute_percentage_errors = np.abs((true_data - forecast_data) / true_data)
    mape = np.mean(absolute_percentage_errors) * 100
    return mape

def calculate_rmse(true_data, forecast_data):
    rmse = np.sqrt(mean_squared_error(true_data, forecast_data))
    return rmse

def calculate_mae(true_data, forecast_data):
    mae = mean_absolute_error(true_data, forecast_data)
    return mae

def calculate_r2(true_data, forecast_data):
    r2 = r2_score(true_data, forecast_data)
    return r2

def calculate_forecast_bias(true_data, forecast_data):
    forecast_bias = np.mean(forecast_data - true_data)
    return forecast_bias


mape = calculate_mape(y3, m_test)
rmse = calculate_rmse(y3, m_test)
mae = calculate_mae(y3, m_test)
fb = calculate_forecast_bias(y3, m_test)
r2 = calculate_r2(y3, m_test)
print("MAPE:", mape,'\nRMSE:',rmse,'\nMAE:',mae,'\nFB:',fb,'\nR2:',r2)
print()
mape = calculate_mape(y3, np.nan_to_num(y3shift,nan=0.0))
rmse = calculate_rmse(y3, np.nan_to_num(y3shift,nan=0.0))
mae = calculate_mae(y3, np.nan_to_num(y3shift,nan=0.0))
fb = calculate_forecast_bias(y3, np.nan_to_num(y3shift,nan=0.0))
r2 = calculate_r2(y3, np.nan_to_num(y3shift,nan=0.0))
print("MAPE:", mape,'\nRMSE:',rmse,'\nMAE:',mae,'\nFB:',fb,'\nR2:',r2)

In [None]:
#strategy
#find the days were the price today is below -2sigma or above +2sigma
print(val_dates.shape,y2.shape,m.shape,s.shape)
prediction_df = pd.DataFrame({'Date':val_dates.squeeze(),'Close':y2.squeeze(), 
                              'Model_mean':m, 'Model_sigma':s})
prediction_df['m+3s'] = prediction_df['Model_mean'] + 3*prediction_df['Model_sigma']
prediction_df['m-3s'] = prediction_df['Model_mean'] - 3*prediction_df['Model_sigma']
prediction_df.head()
print()

In [None]:
condition1 = prediction_df['Close'] > prediction_df['m+3s']
condition2 = prediction_df['Close'] < prediction_df['m-3s']
# combined_condition = condition1 | condition2
above_result = prediction_df[condition1]
above_result_dates = above_result['Date'].tolist()
below_result = prediction_df[condition2]
below_result_dates = below_result['Date'].tolist()
print(len(above_result))
print(len(below_result))

plt.figure(figsize=(10, 6))
plt.plot(prediction_df['Date'],prediction_df['Close'], label='True Values', color='black')
plt.plot(prediction_df['Date'],prediction_df['Model_mean'], label='Predicted Mean', color='red')
plt.fill_between(prediction_df['Date'], prediction_df['m-3s'], prediction_df['m+3s'], color='indigo', alpha=0.3, label='$3\sigma$ Uncertainty')
plt.vlines(above_result_dates,ymin=0,ymax=max(prediction_df['m+3s']), color='green', linestyle='--', label='Above Cond. Met',alpha=0.5)
plt.vlines(below_result_dates,ymin=0,ymax=max(prediction_df['m+3s']), color='r', linestyle='--', label='Below Cond. Met',alpha=0.5)

plt.xlabel('Time Step')
plt.ylabel('Value')
plt.title(f'{name} Predictions with Uncertainties')
plt.legend()
plt.grid(True)
plt.tight_layout()
# plt.xlim((prediction_df[50][0],prediction_df[350][0]))
plt.show()

In [None]:
y2 = y2.reshape(-1)
print(y2.shape,m.shape)
print(m.shape)
true_pct_changes = (np.diff(y2) / y2[:-1]) * 100
pred_pct_changes = ((m - y2) / y2) * 100 
pred_pct_changes2 = (np.diff(m) / m[:-1]) * 100

# y3 = y3.reshape(-1)
# true_pct_changes = (np.diff(y3) / y3[:-1]) * 100
# pred_pct_changes = ((m_test - y3) / y3) * 100 


print(true_pct_changes.shape)
print(pred_pct_changes.shape)
plt.figure()
plt.scatter(true_pct_changes, pred_pct_changes[:-1],color='orange',alpha=.4,label='Pred - Prev. Truth')
plt.scatter(true_pct_changes, pred_pct_changes2,color='indigo',alpha=.4,label='Pred - Prev. Pred')
plt.plot(true_pct_changes, true_pct_changes,lw=4,color='cadetblue')
plt.axhline(y=0,ls='--',color='red')
plt.axvline(x=0,ls='--',color='red')
plt.xlabel('True Values')
plt.ylabel('Predictions')
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(6, 4.5))
plt.hist2d(true_pct_changes, pred_pct_changes[:-1], bins=25, cmap='jet',cmin=1)#,range=[[-2, 2], [-2, 2]]
# plt.hist2d(true_pct_changes, pred_pct_changes2, bins=25, cmap='jet',range=[[-2, 2], [-2, 2]],cmin=1)
plt.colorbar(label='Counts')
plt.xlabel('True values')
plt.ylabel('Predictions')
plt.title('2D Histogram')
plt.axhline(y=0, color='red', linestyle='-',lw=2.5)   # Horizontal line at y=0
plt.axvline(x=0, color='red', linestyle='-',lw=2.5)  # Vertical line at x=0
plt.plot(true_pct_changes, true_pct_changes, color='blue')   
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
conf_matrix = confusion_matrix(np.sign(true_pct_changes), np.sign(pred_pct_changes[:-1]), labels=[1, -1])
# conf_matrix = confusion_matrix(np.sign(true_pct_changes), np.sign(pred_pct_changes2), labels=[1, -1])
print("Confusion Matrix:")
print(conf_matrix)

In [None]:

same_sign_indices = np.where(np.sign(true_pct_changes) == np.sign(pred_pct_changes[:-1]))
diff_sign_indices = np.where(np.sign(true_pct_changes) != np.sign(pred_pct_changes[:-1]))

plt.figure(figsize=(6,4))
_, bins, _ = plt.hist(true_pct_changes,bins=50,histtype='step',label='Actual Daily Returns')
plt.hist(true_pct_changes[same_sign_indices],bins=bins,histtype='step',color='green',label='Correct')
plt.hist(true_pct_changes[diff_sign_indices],bins=bins,histtype='step',color='red',label='Incorrect')
plt.grid(color='0.95')
plt.xlabel('Returns [%]')
plt.title('Test set')
x_min, x_max = plt.xlim()
y_min, y_max = plt.ylim()
text_x = x_min + 0.025 * (x_max - x_min)
text_y = y_max - 0.1 * (y_max - y_min)
plt.text(text_x, text_y, '% correct sign {:.3f}\n% incorrect sign {:.3f}'.format(len(true_pct_changes[same_sign_indices])/len(true_pct_changes),
                                                                                 len(true_pct_changes[diff_sign_indices])/len(true_pct_changes)), ha='left', va='top')
plt.legend()
plt.show()

In [None]:
fig, ax = plt.subplots()
im = ax.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.cool)

cbar = ax.figure.colorbar(im, ax=ax)
cbar.ax.set_ylabel('Counts', rotation=-90, va="bottom")
ax.set(xlabel='Predicted labels',ylabel='True Labels')
ax.set_xticks([0,1])
ax.set_xticklabels(['1', '-1'])
ax.set_yticks([0,1])
ax.set_yticklabels(['1', '-1'])
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(j, i, str(conf_matrix[i, j]), ha="center", va="center", color="white" if conf_matrix[i, j] > conf_matrix.max() / 2 else "black")

plt.show()

In [None]:
#one day test
one_day_input = np.lib.stride_tricks.sliding_window_view(df.iloc[-12:,1].values, sequence_length)[:-1]
y4 = np.array(df.iloc[sequence_length-12:,1]).astype(np.float32)


print(one_day_input)
print(y4)

In [None]:
#aleatoric uncertainty +- sigma
#epistemic uncertainty 100 predict calls
yhats_test = [model(one_day_input) for _ in range(100)]

means = []
stds = []
avgm = np.zeros_like(one_day_input[...,0])
for i, yhat in enumerate(yhats_test):
  means.append(yhat.mean())
  stds.append(yhat.stddev())
  m_test = np.squeeze(yhat.mean())
  s_test = np.squeeze(yhat.stddev())
  avgm += m_test/len(yhats_test)

mean_array = np.mean(np.array(means),axis=0)
std_array = np.mean(np.array(stds),axis=0)
print(mean_array)
print(std_array)
print(avgm)
# print(means)
print('Pred: ',m_test,'+-',s_test)
upper_bound = m_test + s_test
lower_bound = m_test - s_test
# print(m_test, '+-', s_test)
print('Truth: ',y4)

In [None]:
plt.figure(figsize=(5,4))
plt.hist(s,bins=50)
plt.xlabel('Model Uncertainty $\sigma$')
plt.title('Test Set')
plt.grid(color='0.95')
plt.show()

In [None]:
true_pct_changes = (np.diff(y2) / y2[:-1]) * 100
pred_pct_changes = (((m - y2) / y2) * 100 )[:-1]
pred_pct_changes2 = (np.diff(m) / m[:-1]) * 100
print(pred_pct_changes.shape,pred_pct_changes2.shape,true_pct_changes.shape)

sig_cut = 4
true_pct_changes_sig = true_pct_changes[s[:-1]<sig_cut]
pred_pct_changes_sig = pred_pct_changes[s[:-1]<sig_cut]
pred_pct_changes2_sig = pred_pct_changes2[s[:-1]<sig_cut]
print(pred_pct_changes_sig.shape,pred_pct_changes2_sig.shape,true_pct_changes_sig.shape)
plt.figure()
plt.scatter(true_pct_changes_sig, pred_pct_changes_sig,color='orange',alpha=.4,label='Pred - Prev. Truth')
# plt.scatter(true_pct_changes_sig, pred_pct_changes2_sig,color='indigo',alpha=.4,label='Pred - Prev. Pred')
plt.plot(true_pct_changes_sig, true_pct_changes_sig,lw=4,color='cadetblue')
plt.axhline(y=0,ls='--',color='red')
plt.axvline(x=0,ls='--',color='red')
plt.xlabel('True Values')
plt.ylabel('Predictions')
plt.legend()
plt.show()

print(true_pct_changes[:10])
print(pred_pct_changes[:10])
same_sign_indices = np.where(np.sign(true_pct_changes_sig) == np.sign(pred_pct_changes_sig))
diff_sign_indices = np.where(np.sign(true_pct_changes_sig) != np.sign(pred_pct_changes_sig))

plt.figure(figsize=(6,4))
_, bins, _ = plt.hist(true_pct_changes_sig,bins=50,histtype='step',label='Actual Daily Returns')
plt.hist(true_pct_changes[same_sign_indices],bins=bins,histtype='step',color='green',label='Correct')
plt.hist(true_pct_changes[diff_sign_indices],bins=bins,histtype='step',color='red',label='Incorrect')
plt.grid(color='0.95')
plt.xlabel('Returns [%]')
plt.title(f'Test set (Post Cut) {sig_cut}')
x_min, x_max = plt.xlim()
y_min, y_max = plt.ylim()
text_x = x_min + 0.025 * (x_max - x_min)
text_y = y_max - 0.1 * (y_max - y_min)
plt.text(text_x, text_y, '% correct sign {:.3f}\n% incorrect sign {:.3f}'.format(len(true_pct_changes_sig[same_sign_indices])/len(true_pct_changes_sig),
                                                                                 len(true_pct_changes_sig[diff_sign_indices])/len(true_pct_changes_sig)), ha='left', va='top')
plt.legend()
plt.show()