<a href="https://colab.research.google.com/github/96jonesa/CSE-517-Project/blob/main/scaffolding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Imports

In [None]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

#GRU

##GRU
This is just a wrapper around nn.GRU for the sake of consistency. Used in the Price Encoder, day-level SMI Encoder, and temporal SMI Encoder.

In [None]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, batch_first=False):
        super(GRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.batch_first = batch_first

        self.gru = nn.GRU(input_size, hidden_size, batch_first=self.batch_first)

    def forward(self, input, h_0):
        output, hn = self.gru(input, h_0)
        return output, hn

#Self-Attention

##LinearAttention
The attention mechanism used in Feng et. al. Used in the Price Encoder, day-level SMI Encoder, and temporal SMI Encoder. Given input $h$, returns
$q_t = \sum_{i=t-T}^T \beta_i h_i$ where $\beta_i = \dfrac{\exp\left( u^T \tanh \left( W h_i + b \right) \right)}{\sum_{k=t-T}^t \exp\left( u^T \tanh \left( W h_k + b \right) \right)}$.

In [None]:
# attention weights are softmax(u^T tanh(W input + b)) where W is learned parameter matrix, u is a learned parameter vector, and b is a learned offset

class LinearAttention(nn.Module):
    def __init__(self, input_size, intermediate_size, weights_size):
        super(LinearAttention, self).__init__()
        self.input_size = input_size
        self.intermediate_size = intermediate_size
        self.weights_size = weights_size

        self.linear_1 = nn.Linear(self.input_size, self.intermediate_size, bias=True)
        self.linear_2 = nn.Linear(self.intermediate_size, self.weights_size, bias=False)
        self.tanh = nn.Tanh()
        self.softmax = nn.Softmax(dim=2)

    def forward(self, input):
        intermediate = self.tanh(self.linear_1(input))
        attention_weights = self.softmax(self.linear_2(intermediate))
        attention_weights = attention_weights.permute(0, 2, 1)
        output_features = torch.bmm(attention_weights, input)

        return output_features

##BilinearAttention
The attention mechanism proposed in Sawhney et. al, which does not work with inputs of the shapes proposed in the paper. Various choices of left and right vectors could be used here to provide an alternative to the LinearAttention module. Given inputs $L$ and $R$, returns
$q_t = \sum_{i=t-T}^T \beta_i h_i$ where $\beta_i = \dfrac{\exp\left( L_i^T W R  \right)}{\sum_{k=t-T}^t \exp\left( L_k^T W R \right)}$.

In [None]:
# attention weights are softmax(left^T W right) where W is learned parameter matrix

class BilinearAttention(nn.Module):
    def __init__(self, left_size, right_size, weights_size):
        super(BilinearAttention, self).__init__()
        self.left_size = left_size
        self.right_size = right_size
        self.weights_size = weights_size

        self.bilinear = nn.Bilinear(self.left_size, self.right_size, weights_size, bias=False)
        self.softmax = nn.Softmax()

    def forward(self, left, right):
        attention_weights = self.softmax(self.bilinear(left, right))
        output_features = torch.mm(attention_weights, left)  # check this

        return output_features

#Blending

##Blend
Applies a learned bilinear transformation to the left and right vectors, then inputs the result to a ReLU non-linearity. Used to obtain Multi-Modal Encodings from Price Encodings and temporal SMI Encodings. Given Price Encodings $q_t$ and temporal SMI Encodings $c_t$, returns
$x_t = \mathcal{B} \left( c_t, q_t \right) = \text{ReLU} \left( q_t^T W c_t + b \right)$.

In [None]:
# output is ReLU(left^T W right + b) where W is a learned paramater matrix
# and b is a learned bias

class Blend(nn.Module):
    def __init__(self, left_size, right_size, output_size):
        super(Blend, self).__init__()
        self.left_size = left_size
        self.right_size = right_size
        self.output_size = output_size

        self.bilinear = nn.Bilinear(self.left_size, self.right_size, output_size, bias=True)
        self.relu = nn.ReLU()
    
    def forward(self, left, right):
        output = self.relu(self.bilinear(left, right))

        return output

#Single-Headed Graph Attention Network (SGAT)

##SharedLinear
This is just a wrapper around nn.Linear for the sake of consistency. Used to apply a shared linear transformation to all inputs of an SGAT layer. Under current implementation, this should be applied before passing inputs to SGAT.

In [None]:
# need shared learned parameter matrix W to multiply against each input vector

class SharedLinear(nn.Module):
    def __init__(self, input_size, output_size):
        super(SharedLinear, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        
        self.linear = nn.Linear(input_size, output_size, bias=False)
    
    def forward(self, input):
        output = self.linear(input)

        return output

##SGAT
A single-headed GAT layer. A shared linear transform $W$ is applied to all the nodes *before* passing them as input to this module (by passing them as input to a SharedLinear layer), then a shared self-attention mechanism is applied to each node $i$ in its immediate neighborhood $\mathcal{N}_i$. For each node $j\in \mathcal{N}_i$, normalized attention coefficients $\alpha_{i,j}$ are computed to represent the importance of the relations between stocks $i$ and $j$. That is,
$\alpha_{i,j} = \dfrac{\exp ( \text{LeakyReLU} ( a_w^T [ W x_i 
\oplus W x_j ] ) )}{\sum_{k\in \mathcal{N}_i} \exp ( \text{LeakyReLU} ( a_w^T [ W x_i \oplus W x_k ] ) )}$
where $\oplus$ denotes concatenation and $a_w$ is a learned parameter matrix. An updated feature vector $z_i$ for the $i$-th stock is computed by applying these attention weights to the linearly transformed multi-modal feature vectors of all of the stocks in $\mathcal{N}_i$

In [None]:
# merge code with MGAT code to form general case GAT code

class SGAT(nn.Module):
    def __init__(self, input_size, weights_size, leakyrelu_slope):
        super(SGAT, self).__init__()
        self.input_size = input_size
        self.weights_size = weights_size
        self.leakyrelu_slope = leakyrelu_slope
        
        self.linear = nn.Linear(2 * input_size, weights_size, bias=False)
        self.leakyrelu = nn.LeakyReLU(self.leakyrelu_slope)
        self.softmax = nn.Softmax()

    def forward(self, input, neighborhoods):
        attention_weights = self.softmax(self.leakyrelu(self.linear(torch.cat(input, neighborhoods))))  # check this
        output_features = torch.mm(attention_weights, input)  # check this

        return output_features

#Multi-Headed Graph Attention Network (MGAT)

In [None]:
# decide between implementing like this and implementing in main module

class MGAT(nn.Module):
    def __init__(self, input_size, weights_size, leakyrelu_slope):
        super(MGAT, self).__init__()
        self.input_size = input_size
        self.weights_size = weights_size
        self.leakyrelu_slope = leakyrelu_slope
        
        self.sgat = SGAT(self.input_size, self.weights_size, self.leakyrelu_slope)

    def forward(self, input, neighborhoods, num_heads):
        attention_weights =  # initialize to correct shape
        output_features =  # initialize to correct shape
        for i in range(num_heads):
            attention_weights[i], output_features[[i]] = self.sgat(input, neighborhoods)  # should we initialize fresh SGAT?

        return output_features  # check shape of output_features

#Scaffolding

In [None]:
class MANSF(nn.Module):
    def __init__(self, input_size, output_size, leakyrelu_slope, elu_alpha):
        super(MANSF, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.leakyrelu_slope = leakyrelu_slope
        self.elu_alpha = elu_alpha

        # fill in parameters for all of these
        self.gru_p = GRU()
        self.gru_m = GRU()
        self.gru_s = GRU()
        self.attn_p = LinearAttention()
        self.attn_m = LinearAttention()
        self.attn_s = LinearAttention()
        self.sgat_1 = SGAT()
        self.sgat_2 = SGAT()
        self.blend = Blend()
        self.elu = nn.ELU(elu_alpha)
        self.sigmoid = nn.Sigmoid()
        self.linear = nn.Linear()

    def forward(self, price_input, smi_input, neighborhoods, num_heads):
        _, price_gru_states = self.gru_p(price_input)
        price_encoding = self.attn_p(price_gru_states)

        _, smi_day_gru_states = self.gru_m(smi_input)
        smi_day_encoding = self.attn_m(smi_day_gru_states)
        _, smi_gru_states = self.gru_s(smi_day_encoding)
        smi_encoding = self.attn_s(smi_gru_states)

        multi_modal_encoding = self.blend(price_encoding, smi_encoding)

        z_1 = self.elu(self.sgat_1(multi_modal_encoding, neighborhoods))
        z_2 = self.sigmoid(self.sgat_2(z_1, neighborhoods))
        y = self.sigmoid(self.linear(z_2))

        return y

#Sandbox

##Price Encoding

In [None]:
T = 5  # number of days in lookback window
batch_size = 4
gru_p_hidden_size = 64

p = torch.rand(batch_size, T, 3)  # p_i = [p_i^c, p_i^h, p_i^l], not bothering to normalize for shape tests

print('p.shape', p.shape)

h_p_0 = torch.randn(1, batch_size, gru_p_hidden_size)  # randomly initialized initial hidden state
gru_p = GRU(3, gru_p_hidden_size, batch_first=True)

h_p, h_p_n = gru_p(p, h_p_0)

print('h_p.shape', h_p.shape)
print('h_p_n.shape', h_p_n.shape)

p.shape torch.Size([4, 5, 3])
h_p.shape torch.Size([4, 5, 64])
h_p_n.shape torch.Size([1, 4, 64])


In [None]:
attn_p_intermediate_size = 10

attn_p = LinearAttention(gru_p_hidden_size, attn_p_intermediate_size, 1)

q = attn_p(h_p)

print('q.shape', q.shape)

q.shape torch.Size([4, 1, 64])


##SMI Encoding

In [None]:
K = [7, 9, 11, 13, 15]  # number of tweets for each day in lookback window
T = 5  # number or days in lookback window
batch_size = 4
gru_m_hidden_size = 64
use_embedding_size = 512

r = torch.zeros(batch_size, 0, gru_m_hidden_size)

gru_m = GRU(use_embedding_size, gru_m_hidden_size, batch_first=True)

for t in range(T):

    m = torch.rand(batch_size, K[0], use_embedding_size)

    print('m.shape', m.shape)

    h_m_0 = torch.randn(1, batch_size, gru_m_hidden_size)  # randomly initialized initial hidden state

    h_m, h_m_n = gru_m(m, h_m_0)

    print('h_m.shape', h_m.shape)
    print('h_m_n.shape', h_m_n.shape)

    attn_m_intermediate_size = 10

    attn_m = LinearAttention(gru_m_hidden_size, attn_m_intermediate_size, 1)

    r_t = attn_m(h_m)

    print('r_t.shape', r_t.shape)

    r = torch.cat((r, r_t), 1)

    print('r.shape', r.shape)

    print()

m.shape torch.Size([4, 7, 512])
h_m.shape torch.Size([4, 7, 64])
h_m_n.shape torch.Size([1, 4, 64])
r_t.shape torch.Size([4, 1, 64])
r.shape torch.Size([4, 1, 64])

m.shape torch.Size([4, 7, 512])
h_m.shape torch.Size([4, 7, 64])
h_m_n.shape torch.Size([1, 4, 64])
r_t.shape torch.Size([4, 1, 64])
r.shape torch.Size([4, 2, 64])

m.shape torch.Size([4, 7, 512])
h_m.shape torch.Size([4, 7, 64])
h_m_n.shape torch.Size([1, 4, 64])
r_t.shape torch.Size([4, 1, 64])
r.shape torch.Size([4, 3, 64])

m.shape torch.Size([4, 7, 512])
h_m.shape torch.Size([4, 7, 64])
h_m_n.shape torch.Size([1, 4, 64])
r_t.shape torch.Size([4, 1, 64])
r.shape torch.Size([4, 4, 64])

m.shape torch.Size([4, 7, 512])
h_m.shape torch.Size([4, 7, 64])
h_m_n.shape torch.Size([1, 4, 64])
r_t.shape torch.Size([4, 1, 64])
r.shape torch.Size([4, 5, 64])



In [None]:
gru_s_hidden_size = 64

print('r.shape', r.shape)

h_s_0 = torch.randn(1, batch_size, gru_s_hidden_size)  # randomly initialized initial hidden state
gru_s = GRU(gru_m_hidden_size, gru_s_hidden_size, batch_first=True)

h_s, h_s_n = gru_s(r, h_s_0)

print('h_s.shape', h_s_0.shape)
print('h_s_n.shape', h_s_n.shape)

r.shape torch.Size([4, 5, 64])
h_s.shape torch.Size([1, 4, 64])
h_s_n.shape torch.Size([1, 4, 64])


In [None]:
attn_s_intermediate_size = 10

attn_s = LinearAttention(gru_s_hidden_size, attn_s_intermediate_size, 1)

c = attn_s(h_s)

print('c.shape', c.shape)

c.shape torch.Size([4, 1, 64])


##GAT