In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import matplotlib.pyplot as plt # for making figures
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

%matplotlib inline
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

torch.manual_seed(1)

In [2]:
inp = [torch.randn(1, 3) for _ in range(5)]  # make a sequence of length 5
for i in inp:
    print(i.view(1, 1, -1))

In [3]:
lstm = nn.LSTM(3, 3)  # Input dim is 3, output dim is 3
inputs = [torch.randn(1, 3) for _ in range(5)]  # make a sequence of length 5

# initialize the hidden state.
hidden = (torch.randn(1, 1, 3),
          torch.randn(1, 1, 3))
# not correct this is both hidden and cell state
for i in inputs:
    # Step through the sequence one element at a time.
    # after each step, hidden contains the hidden state.
    out, hidden = lstm(i.view(1, 1, -1), hidden)

# print(out[-1])
print(hidden)
# print(torch.all(torch.eq(out, hidden[0])))
# alternatively, we can do the entire sequence all at once.
# the first value returned by LSTM is all of the hidden states throughout
# the sequence. the second is just the most recent hidden state
# (compare the last slice of "out" with "hidden" below, they are the same)
# The reason for this is that:
# "out" will give you access to all hidden states in the sequence
# "hidden" will allow you to continue the sequence and backpropagate,
# by passing it as an argument  to the lstm at a later time
# Add the extra 2nd dimension
print(torch.cat(inputs))
inputs = torch.cat(inputs).view(len(inputs), 1, -1)
hidden = (torch.randn(1, 1, 3), torch.randn(1, 1, 3))  # clean out hidden state
out, hidden = lstm(inputs, hidden)
print(f'{inputs=}')
# print(out)
print(hidden)

In [4]:
# -----------------------------------------------------------------------------------------------
class Linear:
  def __init__(self, fan_in, fan_out, bias=True):
    self.weight = torch.randn((fan_in, fan_out)) / fan_in**0.5 # note: kaiming init
    self.bias = torch.randn(fan_out) / fan_out**0.5 if bias else None # note: kaiming init
    # self.bias = torch.zeros(fan_out) if bias else None
  
  def __call__(self, x):
    self.out = x @ self.weight
    if self.bias is not None:
      self.out += self.bias
    # print(f'{self.out.shape=}')
    return self.out
  
  def parameters(self):
    return [self.weight] + ([] if self.bias is None else [self.bias])

# -----------------------------------------------------------------------------------------------
class BatchNorm1d:
  # 1/m x.sum(dim)                  mean
  # torch.sum((x - mean)**2)/(m-1)  variance
  # (x-mean)/torch.sqrt(var+eps)    normalize
  # gamma*x + beta                  scale and shift
  def __init__(self, dim, eps=1e-5, momentum=0.1):
    self.eps = eps
    self.momentum = momentum
    self.training = True
    # parameters (trained with backprop)
    self.gamma = torch.ones(dim)
    self.beta = torch.zeros(dim)
    # buffers (trained with a running 'momentum update')
    self.running_mean = torch.zeros(dim)
    self.running_var = torch.ones(dim)
  
  def __call__(self, x):
    # calculate the forward pass
    if self.training:
      if x.ndim == 2:
        dim = 0
      elif x.ndim == 3:
        dim = (0,1)
      xmean = x.mean(dim, keepdim=True) # batch mean
      xvar = x.var(dim, keepdim=True) # batch variance
    else:
      xmean = self.running_mean
      xvar = self.running_var
    xhat = (x - xmean) / torch.sqrt(xvar + self.eps) # normalize to unit variance
    self.out = self.gamma * xhat + self.beta
    # update the buffers
    if self.training:
      with torch.no_grad():
        self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean
        self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar
    return self.out
  
  def parameters(self):
    return [self.gamma, self.beta]

# -----------------------------------------------------------------------------------------------
class Tanh:
  def __call__(self, x):
    self.out = torch.tanh(x)
    return self.out
  
  def parameters(self):
    return []

# -----------------------------------------------------------------------------------------------
class Embedding:
  def __init__(self, num_embeddings, embedding_dim):
    self.weight = torch.randn((num_embeddings, embedding_dim))
    
  def __call__(self, IX):
    self.out = self.weight[IX]
    return self.out
  
  def parameters(self):
    return [self.weight]

# -----------------------------------------------------------------------------------------------
class FlattenConsecutive:
  def __init__(self, n):
    self.n = n
    
  def __call__(self, x):
    B, T, C = x.shape
    x = x.view(B, T//self.n, C*self.n)
    if x.shape[1] == 1:
      x = x.squeeze(1)
    self.out = x
    return self.out
  
  def parameters(self):
    return []

# -----------------------------------------------------------------------------------------------
class Sequential:
  def __init__(self, layers):
    self.layers = layers
  
  def __call__(self, x):
    for layer in self.layers:
      if type(layer) is LSTM:
        x, _ = layer(x)
      else:
        x = layer(x)
    self.out = x
    return self.out
  
  def parameters(self):
    # get parameters of all layers and stretch them out into one list
    return [p for layer in self.layers for p in layer.parameters()]

# ------------------------------------------------------------------------------------------------
class Sigmoid:
  def __call__(self, x):
    self.out = torch.sigmoid(x)
    return self.out
  
  def parameters(self):
    return []
  
# ------------------------------------------------------------------------------------------------
class LSTM:
  def __init__(self, input_size, hidden_size, bias=True):
    self.f_W = torch.randn((input_size, hidden_size)) / input_size**0.5   # note: kaiming init
    self.f_U = torch.randn((hidden_size, hidden_size)) / hidden_size**0.5 # note: kaiming init
    # self.f_bias = torch.zeros(hidden_size) if bias else None
    self.f_bias = torch.randn(hidden_size) / hidden_size**0.5 if bias else None # note: kaiming init

    self.m_W = torch.randn((input_size, hidden_size)) / input_size**0.5   # note: kaiming init
    self.m_U = torch.randn((hidden_size, hidden_size)) / hidden_size**0.5 # note: kaiming init
    # self.m_bias = torch.zeros(hidden_size) if bias else None
    self.m_bias = torch.randn(hidden_size) / hidden_size**0.5 if bias else None # note: kaiming init

    self.i_W = torch.randn((input_size, hidden_size)) / input_size**0.5   # note: kaiming init
    self.i_U = torch.randn((hidden_size, hidden_size)) / hidden_size**0.5 # note: kaiming init
    # self.i_bias = torch.zeros(hidden_size) if bias else None
    self.i_bias = torch.randn(hidden_size) / hidden_size**0.5 if bias else None # note: kaiming init

    self.o_W = torch.randn((input_size, hidden_size)) / input_size**0.5   # note: kaiming init
    self.o_U = torch.randn((hidden_size, hidden_size)) / hidden_size**0.5 # note: kaiming init
    # self.o_bias = torch.zeros(hidden_size) if bias else None
    self.o_bias = torch.randn(hidden_size) / hidden_size**0.5 if bias else None # note: kaiming init

    self.input_size = input_size
    self.hidden_size = hidden_size

    self.bias = bias
    self.training = True
  
  def __call__(self, x):
    B, T, _ = x.shape                               # T is the sequence size 
    h_n = torch.tensor([])
    h = torch.zeros(B, self.hidden_size)
    c = torch.zeros(B, self.hidden_size)
    for t in range(T):
      x_t = x[:, t, :]
      forget = x_t @ self.f_W + h @ self.f_U   # forget gate
      add = x_t @ self.i_W + h @ self.i_U      # input/add gate
      mask = x_t @ self.m_W + h @ self.m_U     # input gate
      output = x_t @ self.o_W + h @ self.o_U   # output gate
      if self.bias:
        forget += self.f_bias
        add += self.i_bias
        mask += self.m_bias
        output += self.o_bias
      forget = torch.sigmoid(forget)
      add = torch.tanh(add)
      mask = torch.sigmoid(forget)
      output = torch.sigmoid(output)
      c = c * forget + add * mask
      h = torch.tanh(c) * output
      h_n = torch.cat((h_n, h.unsqueeze(1)), dim=1)
    return h_n, (h, c)

  def train(self):
    self.training = True
  
  def eval(self):
    self.training = False
  
  def parameters(self):
    return [self.f_W, self.m_W, self.i_W, self.o_W, self.f_U, self.m_U, self.i_U, self.o_U] +\
  ([self.f_bias, self.m_bias, self.i_bias, self.o_bias] if self.bias else [] )

# ------------------------------------------------------------------------------------------------
class Head(nn.Module):
  def __init__(self, n_embed, head_size, block_size, dropout_rate=0.0) -> None:
    super().__init__()
    self.query = nn.Linear(n_embed, head_size, bias=True)
    self.key = nn.Linear(n_embed, head_size, bias=True)
    self.value = nn.Linear(n_embed, head_size, bias=True)
    self.register_buffer('tril', torch.tril(torch.ones(block_size, head_size))) ####################################
    self.dropout = nn.Dropout(dropout_rate)
  
  def forward(self, x):
    B, T, C = x.shape
    k = self.key(x)     # B, T, head_size
    q = self.query(x)   # B, T, head_size
    wei = q @ k.transpose(-1, -2) * k.size(-1)**-0.5  # B, T, T
    wei = wei.masked_fill(self.tril==0, float('-inf'))
    wei = F.softmax(wei, dim=-1) * (1.0/torch.sqrt(k.size(-1)))   # B, T, T
    wei = self.dropout(wei)
    v = self.value(x)   # B, T, head_size
    out = wei @ v    # B, T, head_size
    return out
  
# ------------------------------------------------------------------------------------------------
class MultiheadAttention(nn.Module):
  def __init__(self, n_embed, head_size, block_size, n_head, dropout_rate):
    super().__init__()
    self.heads = nn.ModuleList([Head(n_embed, head_size, block_size, dropout_rate) for _ in range(n_head)])
    self.proj = nn.Linear(head_size * n_head, n_embed)
    self.dropout = nn.Dropout(dropout_rate)
  
  def forward(self, x):
    out = torch.concat([head(x) for head in self.heads], dim=-1)
    return self.dropout(self.proj(out))
  
# ------------------------------------------------------------------------------------------------
class FeedForward(nn.Module):
  def __init__(self, n_embed, dropout_rate=0.0, bias=True):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(n_embed, 4 * n_embed, bias),
        nn.ReLU(),
        nn.Linear(4 * n_embed, n_embed, bias),
        nn.Dropout(dropout_rate),
    )
  
  def forward(self, x):
    return self.net(x)
    
# ------------------------------------------------------------------------------------------------
class Decoder(nn.Module):
  def __init__(self, n_embed, head_size, block_size, n_head, dropout_rate):
    super().__init__()
    self.sa = MultiheadAttention(n_embed, head_size, block_size, n_head, dropout_rate)
    self.ffw = FeedForward(n_embed, dropout_rate)
    self.ln1 = nn.LayerNorm(n_embed)
    self.ln2 = nn.LayerNorm(n_embed)
  
  def forward(self, x):
    out = self.ln1(x + self.sa(x))
    out = self.ln2(out + self.ffw(out))
    return out

# ------------------------------------------------------------------------------------------------
class Transformer(nn.Module):
  def __init__(self, n_embed, head_size, block_size, n_head, n_block_size, vocab_size, num_layers, dropout_rate):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
    self.positional_embedding_table = nn.Embedding(block_size, n_embed)
    self.decoders = nn.Sequential(*[Decoder(n_embed, head_size, block_size, n_head, n_block_size, dropout_rate) for _ in range(num_layers)])
    self.ln = nn.LayerNorm(n_embed)
    self.lm_head = nn.Linear(n_embed, vocab_size)
  
  def forward(self, x):    
    tok_emb = self.token_embedding_table(x)
    pos_emb = self.positional_embedding_table(x)
    emb = tok_emb + pos_emb
    probs = F.softmax(self.lm_head(self.decoders(emb)), dim=-1)
    return probs
    


In [12]:
jja = torch.randn((4, 3, 2))
(a.transpose(-2, -1) == a.transpose(-1, -2)).all()



tensor(True)

In [5]:
df = pd.read_csv('./data/airline-passengers.csv')
timeseries = df[["Passengers"]].values.astype('float32')

# train-test split for time series
train_size = int(len(timeseries) * 0.67)
test_size = len(timeseries) - train_size
train, test = timeseries[:train_size], timeseries[train_size:]

def create_dataset(dataset, lookback):
    """Transform a time series into a prediction dataset
    
    Args:
        dataset: A numpy array of time series, first dimension is the time steps
        lookback: Size of window for prediction
    """
    X, y = [], []
    for i in range(len(dataset)-lookback):
        feature = dataset[i:i+lookback]
        target = dataset[i+1:i+lookback+1]
        X.append(feature)
        y.append(target)
    return torch.tensor(X), torch.tensor(y)

lookback = 4
X_train, y_train = create_dataset(train, lookback=lookback)
X_test, y_test = create_dataset(test, lookback=lookback)

class AirModel():
    def __init__(self):
        super().__init__()
        self.lstm = LSTM(input_size=1, hidden_size=50)
        self.linear = Linear(50, 1)
        self.layers = Sequential([self.lstm, self.linear])

    def parameters(self):
        return self.layers.parameters()
    
    def __call__(self, x):
        return self.layers(x)


model = AirModel()
for param in model.parameters():
    param.requires_grad = True
optimizer = optim.Adam(model.parameters())
loss_fn = nn.MSELoss()
loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=8) #, drop_last=True)


In [6]:
n_epochs = 2000
for epoch in range(n_epochs):
    for X_batch, y_batch in loader:
        # print(f"{X_batch.shape=}")
        y_pred = model(X_batch)
        # print(f'{y_pred.shape=}')
        loss = loss_fn(y_pred, y_batch)
        # for param in model.parameters():
        #     param.grad = None
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # Validation
    if epoch % 100 != 0:
        continue
    with torch.no_grad():
        y_pred = model(X_train)
        train_rmse = np.sqrt(loss_fn(y_pred, y_train))
        y_pred = model(X_test)
        test_rmse = np.sqrt(loss_fn(y_pred, y_test))
    print("Epoch %d: train RMSE %.4f, test RMSE %.4f" % (epoch, train_rmse, test_rmse))

with torch.no_grad():
    # shift train predictions for plotting
    train_plot = np.ones_like(timeseries) * np.nan
    y_pred = model(X_train)
    y_pred = y_pred[:, -1, :]
    train_plot[lookback:train_size] = model(X_train)[:, -1, :]
    # shift test predictions for plotting
    test_plot = np.ones_like(timeseries) * np.nan
    test_plot[train_size+lookback:len(timeseries)] = model(X_test)[:, -1, :]
# plot
plt.plot(timeseries)
plt.plot(train_plot, c='r')
plt.plot(test_plot, c='g')
plt.show()

In [None]:
print(test_plot)

In [None]:
for i in range(len(model.parameters())):
    print(i)
    print(f'{model.parameters()[i]=}')

In [None]:

print(model.parameters()[9])