In [1]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, random_split
import numpy as np
from src.gpt2 import GPT2

In [2]:
ds = lambda x, y: TensorDataset(torch.from_numpy(x), torch.from_numpy(y))
train_x = np.load('./data/train_x.npy')
train_y = np.load('./data/train_y.npy')
test_x = np.load('./data/test_x.npy')
test_y = np.load('./data/test_y.npy')

In [3]:
train_ds = ds(train_x, train_y)

In [4]:
len(train_ds)

60000

In [5]:
train_size = int(0.9*len(train_ds))

In [6]:
train_ds, valid_ds = random_split(
    train_ds, [train_size, len(train_ds) - train_size]
)

In [7]:
len(train_ds), len(valid_ds)

(54000, 6000)

In [8]:
train_datasets = DataLoader(train_ds, shuffle=True, batch_size=4, num_workers=4)

In [10]:
train, label = iter(train_datasets).next()

In [11]:
x = train.view(train.shape[0], -1)
x = x.transpose(0,1).contiguous()
x.size()

torch.Size([784, 4])

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class Block(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(Block, self).__init__()
        self.ln_1 = nn.LayerNorm(embed_dim)
        self.ln_2 = nn.LayerNorm(embed_dim)
        self.attn = nn.MultiheadAttention(embed_dim, num_heads)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, embed_dim), nn.GELU(), nn.Linear(embed_dim, embed_dim)
        )

    def forward(self, x):
        attn_mask = torch.full(
            (len(x), len(x)), -float("Inf"), device=x.device, dtype=x.dtype
        )
        attn_mask = torch.triu(attn_mask, diagonal=1)

        x = self.ln_1(x)
        a, _ = self.attn(x, x, x, attn_mask=attn_mask, need_weights=False)
        x = x + a
        m = self.mlp(self.ln_2(x))
        x = x + m
        return x


class GPT2(nn.Module):
    def __init__(
        self, embed_dim, num_heads, num_layers, num_positions, num_vocab, num_classes
    ):
        super(GPT2, self).__init__()

        self.embed_dim = embed_dim

        # start of sequence token
        self.sos = torch.nn.Parameter(torch.zeros(embed_dim))
        nn.init.normal_(self.sos)

        self.token_embeddings = nn.Embedding(num_vocab, embed_dim)
        self.position_embeddings = nn.Embedding(num_positions, embed_dim)

        self.layers = nn.ModuleList()
        for _ in range(num_layers):
            self.layers.append(Block(embed_dim, num_heads))

        self.ln_f = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_vocab, bias=False)
        self.clf_head = nn.Linear(embed_dim, num_classes)

    def forward(self, x, classify=False):
        """
        Expect input as shape [sequence len, batch]
        If classify, return classification logits
        """
        length, batch = x.shape

        h = self.token_embeddings(x)

        # prepend sos token
        sos = torch.ones(1, batch, self.embed_dim, device=x.device) * self.sos
        h = torch.cat([sos, h[:-1, :, :]], axis=0)

        # add positional embeddings
        positions = torch.arange(length, device=x.device).unsqueeze(-1)
        h = h + self.position_embeddings(positions).expand_as(h)

        # transformer
        for layer in self.layers:
            h = layer(h)

        if not classify:
            # return logits
            return self.head(h)

        h = torch.mean(h, dim=0)  # average pool over sequence
        return self.clf_head(h)  # return classification logits


In [13]:
gpt = GPT2(16, 2, 8, 28*28, 16, 10)

In [14]:
gpt

GPT2(
  (token_embeddings): Embedding(16, 16)
  (position_embeddings): Embedding(784, 16)
  (layers): ModuleList(
    (0): Block(
      (ln_1): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
      (ln_2): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): Linear(in_features=16, out_features=16, bias=True)
      )
      (mlp): Sequential(
        (0): Linear(in_features=16, out_features=16, bias=True)
        (1): GELU()
        (2): Linear(in_features=16, out_features=16, bias=True)
      )
    )
    (1): Block(
      (ln_1): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
      (ln_2): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): Linear(in_features=16, out_features=16, bias=True)
      )
      (mlp): Sequential(
        (0): Linear(in_features=16, out_features=16, bias=True)
        (1): GELU()
        (2): Linear(in_features=16, out_features=16, bias=True)


In [15]:
def _shape_input(x): # flatten 함수
    """shape batch of images for input into GPT2 model"""
    x = x.view(x.shape[0], -1)  # flatten images into sequences
    x = x.transpose(0, 1).contiguous()  # to shape [seq len, batch]
    return x

In [17]:
dx, dy = iter(train_datasets).next()
dx.size(), dy.size()

(torch.Size([4, 28, 28]), torch.Size([4]))

In [18]:
h = gpt(_shape_input(dx))
h.size()

torch.Size([784, 4, 16])

In [20]:
criterion = nn.CrossEntropyLoss()

In [26]:
x = _shape_input(dx)
x.size()

torch.Size([784, 4])

In [28]:
x.view(-1).size()

torch.Size([3136])

In [23]:
h.view(-1, h.size(-1)).size()

torch.Size([3136, 16])

In [45]:
xx = np.random.normal(size=[1, 20, 20])
xx = torch.tensor(xx, dtype=torch.long)
xx.size()

torch.Size([1, 20, 20])

In [47]:
_shape_input(xx).size()

torch.Size([400, 1])

In [46]:
gpt(_shape_input(xx)).size()

IndexError: index out of range in self