In [88]:
import numpy as np
import pandas as pd
import torch
from torch import nn, optim, Tensor
import torch.utils.data as Data
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torchmetrics import R2Score
import os
import math

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

1 Hyperparamaters

In [81]:
# general param
BATCH_SIZE = 128
LR = 0.01
EPOCH = 100
SEED = 12345

In [89]:
# transformer param
src_dict_size = 8000
tgt_dict_size = 101
src_len = 275
tgt_len = 9
d_model = 512  # Embedding Size（token embedding和position编码的维度）
d_hid = 2048  # FeedForward dimension (两次线性层中的隐藏层 512->2048->512，线性层是用来做特征提取的），当然最后会再接一个projection层
d_k = d_v = 64  # dimension of K(=Q), V（Q和K的维度需要相同，这里为了方便让K=V）
n_layers = 2  # number of Encoder of Decoder Layer（Block的个数）
n_heads = 2  # number of heads in Multi-Head Attention（有几套头）
dropout = 0.2

2 Import data and build dataset

In [83]:
# file path
PATH='D:\\Deutschland\\FUB\\master_thesis\\data\\gee\\output'
DATA_DIR = os.path.join(PATH, 'monthly_mean')
LABEL_CSV = '9_classes.csv'

label_path = os.path.join(PATH, LABEL_CSV)
files = os.listdir(DATA_DIR)

In [84]:
# load csv file to np.ndarray
labels = pd.read_csv(label_path, sep=',', header=0, index_col=['id'])
x_list = []
y_list = []
for index, row in labels.iterrows():
    df_path = os.path.join(DATA_DIR, f'{index}.csv')
    df = pd.read_csv(df_path, sep=',', header=0, index_col=['date'])
    x = np.array(df, dtype=int)
    x = x.reshape(-1)
    y = row[:]
    x_list.append(x)
    y_list.append(y)

x_data = np.array(x_list)
y_data = np.array(y_list)

In [85]:
# build dataset
x_set = torch.from_numpy(x_data)
y_set = torch.from_numpy(y_data)
dataset = Data.TensorDataset(x_set, y_set)
# split dataset
size = len(dataset)
train_size, test_size = round(0.8 * size), round(0.2 * size)
generator = torch.Generator().manual_seed(SEED)
train_dataset, test_dataset = Data.random_split(dataset, [train_size, test_size], generator)
# data_loader
train_loader = Data.DataLoader(train_dataset,batch_size=BATCH_SIZE,shuffle=True,num_workers=2)
test_loader = Data.DataLoader(test_dataset,batch_size=BATCH_SIZE, shuffle=True,num_workers=2)

3 Transformer network

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [None]:
def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

In [None]:
class TransformerModel(nn.Module):

    def __init__(self, n_src: int, n_tgt:int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(n_src, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, n_tgt)
        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output

4 Initiate an instance

In [None]:
model = TransformerModel(src_dict_size, tgt_len, d_model, n_heads, d_hid, n_layers, dropout).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.99)

5 Train and validate model

In [None]:
def train(model:nn.Module, epoch:int):
    total_step = len(train_loader)
    model.train()
    for i, (input, label) in enumerate(train_loader):
        input = input.to(device)
        label = label.to(device)
        # forward pass
        output = model(input)
        loss = criterion(output, label)

        # backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if i % 50 == 0:
            print('Epoch[{}/{}],Step[{}/{}],Loss:{:.4f}'
            .format(epoch+1,EPOCH,i+50,total_step,loss.item()))

In [None]:
def validate(model:nn.Module):
    model.eval()
    with torch.no_grad():
        for (value, label) in test_loader:
            value = value.to(device)
            label = label.to(device)
            outputs = model(value)
            outputs = outputs.t()
            labels = labels.t()
            r2score = R2Score(num_outputs=num, multioutput='uniform_average').to(device)
            r2 = r2score(labels, outputs).item()
    print('R^2 on test set: %.2f' % r2)

In [None]:
for epoch in range(EPOCH):
    train(model, epoch)