In [1]:
import matplotlib.pyplot as plt
import csv
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

## Load and preprocess the data

We will use data from past 7 days to predict next day temperature.
Since every feature has values with varying ranges, we do normalization to confine feature values to a range of [0, 1] before training a neural network. We do this by subtracting the mean and dividing by the standard deviation of each feature.

80 % of the data will be used to train the model, i.e. 3650 * 0.8 rows. split_fraction can be changed to alter this percentage.

In [11]:
import pandas as pd
df = pd.read_csv('daily-min-temperatures.csv')
df.head()

Unnamed: 0,Date,Temp
0,1981-01-01,20.7
1,1981-01-02,17.9
2,1981-01-03,18.8
3,1981-01-04,14.6
4,1981-01-05,15.8


In [12]:
df["Date"]

0       1981-01-01
1       1981-01-02
2       1981-01-03
3       1981-01-04
4       1981-01-05
           ...    
3645    1990-12-27
3646    1990-12-28
3647    1990-12-29
3648    1990-12-30
3649    1990-12-31
Name: Date, Length: 3650, dtype: object

##### B

In [16]:
from sklearn.model_selection import train_test_split

# Normalize the temperatures
df['Temp'] = (df['Temp'] - df['Temp'].mean()) / df['Temp'].std()

X_train, X_test, y_train, y_test = train_test_split(df['Date'], df['Temp'], test_size=0.2, random_state=42)

print(y_train.std())
print(y_train.mean())

0.9958783604892041
-0.0026863443783818773


##### C

In [None]:
import torch
from torch.utils.data import Dataset

class TempSequenceDataset(Dataset):
    def __init__(self, temps, sequence_length=8):
        self.sequence_length = sequence_length
        self.temps = torch.tensor(temps.values, dtype=torch.float32)
        self.samples = []
        self.targets = []

        for i in range(len(self.temps) - sequence_length):
            self.samples.append(self.temps[i:i+sequence_length])    # sequence  
            self.targets.append(self.temps[i+sequence_length])      # target

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx], self.targets[idx]
    

# Only pass in the normalized temperature column
train_dataset = TempSequenceDataset(df['Temp'][:int(len(df)*0.8)])
test_dataset = TempSequenceDataset(df['Temp'][int(len(df)*0.8):])
train_dataset[0]

(tensor([2.3386, 1.6509, 1.8719, 0.8405, 1.1352, 1.1352, 1.1352, 1.5281]),
 tensor(2.6087))

##### D

In [None]:
import torch
import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size=None):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # LSTM Parameters
        self.input_gate = nn.Linear(input_size+hidden_size, hidden_size)
        self.forget_gate = nn.Linear(input_size+hidden_size, hidden_size)
        self.candidate = nn.Linear(input_size+hidden_size, hidden_size)
        self.output = nn.Linear(input_size+hidden_size, hidden_size)

        self.predictor = nn.Linear(hidden_size, input_size) if output_size is not None else nn.Identity()

        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, mean=0, std=0.01)

    def init_state_cell(self, batch_size, device):
        state = torch.zeros(batch_size, self.hidden_size).to(device)
        cell = torch.zeros(batch_size, self.hidden_size).to(device)
        return state, cell

    def forward(self, x, state=None, cell=None):
        # Get sequence length and batch size
        seq_len, batch_size, _ = x.size()

        # Initialize hidden and cell states if not provided
        if state is None or cell is None:
            state, cell = self.init_state_cell(batch_size, x.device)

        # Lists to store outputs and cell states for each time step
        outputs = []

        # Iterate through the sequence
        for t in range(seq_len):
            # Input at time step t
            xh_t = torch.cat((x[t], state), 1)

            # Input gate
            inp_t = torch.sigmoid(self.input_gate(xh_t))

            # Forget gate
            forget_t = torch.sigmoid(self.forget_gate(xh_t))

            # Cell state
            c_tilda_t = torch.tanh(self.candidate(xh_t))
            cell = forget_t * cell + (1-forget_t) * c_tilda_t

            # Output gate
            ot = torch.sigmoid(self.output(xh_t))

            # Hidden state update
            state = torch.tanh(cell)

            # Normally an LSTM simply outputs the hidden state.
            # However, here we want our outputs to be the logits for the predicted next word.
            output = self.predictor(state)
            outputs.append(output)

        # Stack outputs and cell states along the sequence dimension
        outputs = torch.stack(outputs, dim=0)
        return outputs, (state, cell)
