# Using GANS to Estimate Value-at-risk


## 1. Setup

In [None]:
%%capture
# Installing Yfinance package used to download data
!pip install yfinance --upgrade --no-cache-dir

In [None]:
%%capture
import tensorflow as tf
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras 
from tensorflow.keras import layers

## 2. Loading Data from Yahoo finance and Data preparation Steps

Loading stock price data used to form the portfolio we estimate VaR for.









In [None]:
data_train = yf.download("HM-B.ST EKTA-B.ST TEL2-B.ST SEB-A.ST INVE-B.ST", start="2000-02-27", end="2009-12-20")

In [None]:
data_train = data_train['Adj Close']
data_train.isnull().sum()

In [None]:
# Impute with previous valid value if there is missing data 
data_train=data_train.fillna(method='ffill')

In [None]:
# Make into numpy array
data_train = data_train.to_numpy()

In [None]:
# Calculate returns
def get_returns(data):
  return_data = np.empty([data.shape[0]-1, data.shape[1]])
  for i in range(data.shape[0]-1):
    for a in range(data.shape[1]):
      stock_return = ((data[i+1,a]-data[i,a])/data[i,a])
      return_data[i,a] = stock_return
  return return_data  

return_data = get_returns(data_train)

In [None]:
# Prepare data in correct shape, depends on the forward horizon f of the model
def to_stock_M(org_array, horizon):
  days = len(org_array)
  nbr_matrices = days-horizon+1
  stock_M = np.empty((nbr_matrices,horizon,5))
  for i in range(nbr_matrices):
    end_day = i + horizon 
    sub_array = org_array[i:end_day,:]
    stock_M[i,:,:] = sub_array

  return stock_M  

In [None]:
# Setting parameters for the model
# f: the forwards window for the model
# k: number of assets in the portfolio
# noise_dim: dimension of input noise for the generator
k = 5
f = 1
BATCH_SIZE = 36
noise_dim = 2*k
epochs = 10
training_data = to_stock_M(return_data, 1).astype("float32")

In [None]:
# The normalize function is used during the training to normalize the training data

# Parameter values for normalization
max_tot = np.max(return_data, axis = 0)
min_tot = np.min(return_data, axis = 0)
mean_tot = np.mean(return_data, axis = 0)
std_tot = np.std(return_data, axis = 0)

def normalize(data):
    norm_data = ((data -mean_tot)/std_tot)   
    return norm_data

## 3. The Generator








In [None]:
# Function defining generator 
def Generator_model():
  latent_input = keras.Input(shape=(noise_dim,), name="latent")
  x = layers.Dense(32*k, activation="relu")(latent_input)
  x=layers.BatchNormalization()(x)
  x = layers.Reshape((32,k))(x)
  x = layers.Conv1D(filters=k*2, kernel_size=5, strides=2, activation="relu", padding="same")(x)
  x=layers.BatchNormalization()(x)
  x = layers.Conv1D(filters=k*2, kernel_size=5, strides=2, activation="relu", padding="same")(x)
  x=layers.BatchNormalization()(x)
  x = layers.Conv1D(filters=k*2, kernel_size=5, strides=2, activation="relu", padding="same")(x)
  x=layers.BatchNormalization()(x)
  x = layers.Conv1D(filters=k*2, kernel_size=5, strides=2, activation="relu", padding="same")(x)
  x=layers.BatchNormalization()(x) 
  outputs = layers.Conv1D(filters=k, kernel_size=5, strides=2, padding="same")(x)
  model = keras.Model(inputs=[latent_input], 
                      outputs=outputs, name="Generator_model")
  return model

In [None]:
generator = Generator_model()
generator.summary()

In [None]:
# Testing untrained model
# Input noise 
noise=np.random.normal(size=(noise_dim))
noise=noise.reshape(1,noise_dim)
generated_returns = generator.predict([noise])
print(f'Generated returns are: \n \n {generated_returns}')

## 4. The Discriminator 

In [None]:
# Function defining discriminator model
def discriminator_model():
  M_input = keras.Input(shape=(1,k))
  x = layers.Dense(k*20)(M_input)
  x = layers.LeakyReLU()(x)
  x = layers.Dense(k*10)(x)
  x = layers.LeakyReLU()(x)
  x = layers.Dense(k*5)(x)
  x = layers.LeakyReLU()(x)
  output = layers.Dense(1)(x) 
  model = keras.Model(inputs=[M_input], 
                      outputs=output, name="Discriminator_model")
  return model

In [None]:
discriminator = discriminator_model()
discriminator.summary()

In [None]:
# Testing untrained descriminator discriminator
critic_val = discriminator.predict([generated_returns])
print(f'Critic value is: \n \n {critic_val}')

# Customize the Training Step

In [None]:
# Defining a class WGAN that is a subclass of the superclass keras.Model, were we override the train_step

# The code used below are based on the Keras code example by A_K_Nain, see below link
# https://keras.io/examples/generative/wgan_gp/

class WGAN(keras.Model):
  def __init__(
      self,
      discriminator,
      generator,
      latent_dim,
      disc_extra_steps=3,
      gp_weight=10.0,

  ):
      # Initialize superclass
      super(WGAN, self).__init__()
      # Define attributes  
      self.discriminator = discriminator 
      self.generator = generator 
      self.latent_dim = latent_dim
      self.d_steps = disc_extra_steps
      self.gp_weight = gp_weight
  # Override the compile method, so it can take additional params     
  def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
      super(WGAN, self).compile()
      self.d_optimizer = d_optimizer 
      self.g_optimizer = g_optimizer
      self.d_loss_fn = d_loss_fn
      self.g_loss_fn = g_loss_fn
  # Gradient penalty method to get GP that will be added to discriminator loss
  def gradient_penalty(self, batch_size, real_data, fake_data):
    alpha = tf.random.normal([batch_size, 1, 1], 0.0, 1.0)
    diff = fake_data - real_data
    interpolated = real_data + alpha * diff # gives weighted sample 
    with tf.GradientTape() as gp_tape:
      gp_tape.watch(interpolated)
      # Discriminator output for interpolated matrices  
      pred = self.discriminator([interpolated], training=True)
      # Gradients with respect to interpolated data
      grads = gp_tape.gradient(pred, [interpolated])[0] # ad A aswell?
      # Norm of the gradients
      norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2])) # some problem here?
      gp = tf.reduce_mean((norm-1.0)**2)
      return gp
  # Now we override the train_step / make a custom training step     
  def train_step(self, real_data):
    if isinstance(real_data, tuple):
      real_data = real_data[0]
    batch_size = tf.shape(real_data)[0]
    # First the discriminator is trained for a number of steps before discriminator trained a step
    real_data = normalize(real_data) 

    for i in range(self.d_steps):
      # Get latent vectors for each batch
      random_latent_vectors = tf.random.normal(
          shape=(batch_size, self.latent_dim)
      )
      with tf.GradientTape() as tape:
        # Generate fake_data from generator
        fake_data = self.generator([random_latent_vectors], training=True)
        # Get critic value for fake 
        fake_logits = self.discriminator([fake_data], training = True)
        # Get critic value for real data 
        real_logits =self.discriminator([real_data], training = True)

        # Discriminator loss (wasserstein)
        d_cost = self.d_loss_fn(real_dat=real_logits, fake_dat=fake_logits)
        # Calculate GP
        gp = self.gradient_penalty(batch_size, real_data, fake_data)
        # Add Gp to get the total discriminator loss
        d_loss = d_cost + gp * self.gp_weight

      # Get gradients 
      d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
      # Uppdating weights
      self.d_optimizer.apply_gradients(
          zip(d_gradient, self.discriminator.trainable_variables)
      )

    # Now we train the generator
    # Get latent vectors
    random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
    with tf.GradientTape() as tape:
      # Generate fake data 
      generated_data = self.generator([random_latent_vectors], training=True)
      # Critic value 
      gen_dat_logits = self.discriminator([generated_data], training=True)
      # Calculate loss (wasserstein)
      g_loss = self.g_loss_fn(gen_dat_logits)

    # Get gradients
    gen_gradients = tape.gradient(g_loss, self.generator.trainable_variables)
    # Update weights of generator 
    self.g_optimizer.apply_gradients(
        zip(gen_gradients, self.generator.trainable_variables)
    )
    return {"d_loss": d_loss, "g_loss": g_loss}

# Training of Model

In [None]:
# Optimizers to use
generator_optimizer = keras.optimizers.Adam(
    learning_rate=0.00002, beta_1=0.5, beta_2=0.9
)

discriminator_optimizer = keras.optimizers.Adam(
    learning_rate=0.00002, beta_1=0.5, beta_2=0.9
)

In [None]:
# Loss functions

def discriminator_loss(real_dat, fake_dat):
  real_loss = tf.reduce_mean(real_dat)
  fake_loss = tf.reduce_mean(fake_dat)
  return fake_loss - real_loss

def generator_loss(fake_dat):
  return -tf.reduce_mean(fake_dat)  


In [None]:
# Create WGAN object and compile model

wgan = WGAN(
    discriminator = discriminator,
    generator = generator,
    latent_dim=noise_dim,
    disc_extra_steps=5,
)

wgan.compile(
    d_optimizer=discriminator_optimizer,
    g_optimizer=generator_optimizer,
    g_loss_fn=generator_loss,
    d_loss_fn=discriminator_loss
)

In [None]:
# Training model
varGan = wgan.fit(training_data, batch_size=BATCH_SIZE, epochs=epochs, verbose=0)

In [None]:
# Plotting the loss functions during training for discriminator and generator
plt.plot(varGan.history['d_loss'], label = "Discriminator loss")
plt.plot(varGan.history['g_loss'], label = "Generator loss")
plt.legend()

In [None]:
# Saving generator and discriminator models
generator.save('generator_pretrained.h5')
discriminator.save('discriminator_pretrained.h5')

# Estimating VaR with the Trained Generator

In [None]:
# Simulation to estimate 95% VaR
def varGan_sim(nbr_sim = 1000, k=5, generator = generator ):
    p_w = 1/k
    portfolio_weights = p_w * np.ones((k,1))
    VaR_list = []
    for i in range(nbr_sim):
      noise=np.random.normal(size=(noise_dim))
      noise=noise.reshape(1,noise_dim)
      sim_vals = generator([noise])
      # Transform to original scaling 
      sim_org = sim_vals*std_tot + mean_tot
      sim_org = sim_org.numpy()
      sim_org = sim_org.reshape(k,1)
      # Simulated return for portfolio 
      p_return =np.matmul(portfolio_weights.transpose(), sim_org)
      VaR_list.append(p_return)
    # Estimate VaR from simulated returns in VaR_list       
    VaR = np.percentile(VaR_list, 5)    
    return VaR     

In [None]:
valueatrisk = varGan_sim()
valueatrisk