# Desarrollo M2

In [1]:
%load_ext autoreload
%autoreload 2

import os
import sys
project_path = os.path.abspath('..')
sys.path.insert(1, project_path)

import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score, classification_report
import pandas as pd
# import seaborn
# seaborn.set_context(context="talk")

from torch.utils.data import DataLoader
import torch.nn as nn
import torch
import math, copy
import torch.nn.functional as F

from src.data.dataset import lc_dataset
from src.models.model import periodicTransformer
from src.visualization.plots import plot_periodic
from src.data.curve_generator import random_periodic_sin_mix

  warn(f"Failed to load image Python extension: {e}")


In [2]:
import copy

def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

#### Positional Encoding

In [3]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len= 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        print(position.shape)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        print(div_term.shape)
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        print(self.pe[:x.size(0)].shape)
        print(x.shape)
        x = x + self.pe[:x.size(0)]
        print(x.shape)
        return self.dropout(x)

#### Normalization layer

In [4]:
class LayerNorm(nn.Module):
    "Construct a layernorm module (See citation for details)."
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

#### Add + norm layer

In [5]:
class SublayerConnection(nn.Module):
    """
    A residual connection followed by a layer norm.
    Note for code simplicity the norm is first as opposed to last.
    """
    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        "Apply residual connection to any sublayer with the same size."
        return x + self.dropout(sublayer(self.norm(x)))

#### Feed Forward layer

In [6]:
class PositionwiseFeedForward(nn.Module):
    "Implements FFN equation."
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x))))

#### Encoder Block

In [7]:
class EncoderBlock(nn.Module):
    def __init__(self, dropout=0.1, d_model=240, d_ff=128, h=8):
        super(EncoderBlock, self).__init__()
        self.mod = torch.nn.Linear(1, d_model)
        self.linear1 = torch.nn.Linear(d_model, d_model)
        self.att = torch.nn.MultiheadAttention(d_model, h)

        self.feed_forward = PositionwiseFeedForward(d_model, d_ff)
        self.sublayer = clones(SublayerConnection(d_model, dropout), 2)

    def forward(self, x):
        "Follow Figure 1 (left) for connections."
        x = x.float()
        x = self.sublayer[0](x, lambda x: self.att(x, x, x)[0])
        return self.sublayer[1](x, self.feed_forward)

In [8]:
class TimeFilm(nn.Module):
    def __init__(self, n_harmonics=7, embedding_size=64, T_max=1000.0, input_size = 1):
        super(TimeFilm, self).__init__()

        self.a = nn.parameter.Parameter(
            torch.rand(n_harmonics, embedding_size), requires_grad=True)
        self.b = nn.parameter.Parameter(
            torch.rand(n_harmonics, embedding_size), requires_grad=True)
        self.w = nn.parameter.Parameter(
            torch.rand(n_harmonics, embedding_size), requires_grad=True)
        self.v = nn.parameter.Parameter(
            torch.rand(n_harmonics, embedding_size),  requires_grad=True)

        self.linear_proj = nn.Sequential(nn.Linear(in_features= input_size, out_features=embedding_size, bias=False),
                                         nn.LeakyReLU(0.1))

        self.linear_proj_ = nn.Sequential(nn.Linear(in_features=embedding_size, out_features=embedding_size, bias=False),
                                          nn.LeakyReLU(0.1))
        self.n_ = nn.parameter.Parameter(
            torch.linspace(1, n_harmonics+1, steps=n_harmonics) / T_max, requires_grad=False)

    def harmonics(self, t):
        """ t [n_batch, length sequence, 1, n_harmonics]"""

        return t[:, :, :, None]*2*np.pi*self.n_

    def fourier_coefs(self, t):

        t_harmonics = self.harmonics(t)

        gama_ = torch.tanh(torch.matmul(torch.sin(t_harmonics), self.a) + \
            torch.matmul(torch.cos(t_harmonics), self.b))

        beta_ = torch.matmul(torch.sin(t_harmonics), self.v) + \
            torch.matmul(torch.cos(t_harmonics), self.w)

        return gama_, beta_

    def forward(self, x, t):
        """ t must be of size [n_batch, length sequence]"""
        print(t.dtype)

        gama_, beta_ = self.fourier_coefs(t)

        # self.linear_proj_(self.linear_proj(x[:, :, None])*torch.tanh(torch.squeeze(gama_)) + torch.squeeze(beta_))
        return self.linear_proj_(self.linear_proj(x)*torch.squeeze(gama_) + torch.squeeze(beta_))     

In [9]:
class PositionalEncodingSousa(nn.Module):

    def __init__(self, d_model=200, max_time=1000.0, max_len= 5000):
        super(PositionalEncodingSousa, self).__init__()
        self.div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(max_time) / d_model))
        self.pe = torch.zeros(max_len, 1000, d_model)

    def forward(self, x, t):
        argument = t * self.div_term
        self.pe[:, :, 0::2] = torch.sin(argument)
        self.pe[:, :, 1::2] = torch.cos(argument)
        self.register_buffer('pe', self.pe)        
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)    

#### Final model

In [10]:
class periodicTransformer(nn.Module):
    def __init__(self, n_classes=5, d_model=200, d_ff=128, h=8, N=4, time='discrete'):
        super().__init__()
        self.time = time
        self.pos_enc_discrete = PositionalEncoding(d_model)
        self.pos_enc_continuous = TimeFilm(embedding_size=d_model)
        self.enc_blocks = clones(EncoderBlock(d_model=d_model, d_ff=d_ff, h=h), N)
        self.proj = nn.Linear(d_model, n_classes)
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform(p)

    def forward(self, x, t):
        if self.time == 'continuous':
            x = self.pos_enc_continuous(x, t)
        else:
            x = self.pos_enc_discrete(x)
        for enc in self.enc_blocks:
            x = enc(x)
        x = self.proj(x)
        return F.log_softmax(x, dim=-1)

In [11]:
d_model = 200

temp = periodicTransformer(d_model=d_model, time='countinuos').double()

torch.Size([5000, 1])
torch.Size([100])


  nn.init.xavier_uniform(p)


#### Test data

In [13]:
data = lc_dataset()
data.add_curves('sinmix', N=800, seq_len=60, min_period=0.5, max_period=2, label=0)

In [14]:
batch_size = 2

data_loader = DataLoader(data,
                        batch_size=batch_size,
                        pin_memory=True,
                        num_workers=16,
                        shuffle=True)

In [15]:
for idx, batch in enumerate(data_loader):
    break

In [19]:
temp(batch['mag'], batch['mjd'])

torch.Size([2, 1, 200])
torch.Size([2, 60])


RuntimeError: The size of tensor a (60) must match the size of tensor b (200) at non-singleton dimension 2