<a href="https://colab.research.google.com/github/archyyu/encoder-related/blob/main/encoder_sentiment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import math
import pandas as pd

In [None]:
df = pd.read_csv('https://raw.githubusercontent.com/archyyu/publicResource/main/chat_dataset.csv')

In [None]:
# Hyperparameters
hidden_size = 100
embedding_dim = 40
seq_length = 25
learning_rate = 1e-1
batch_size = 20
dropout = 0.1
eval_iters = 200
num_heads = 8
head_size = 16

In [None]:
pad = '<pad>'
data = []
targets = []
for index, row in df.iterrows():
  data.append(row['message'])
  targets.append(row['sentiment'])

In [None]:
targetset = sorted(set(targets))
sentiment_to_index = {s:i for i, s in enumerate(targetset)}
index_to_sentiment = {i:s for i, s in enumerate(targetset)}

In [None]:
dataset = sorted(set((' '.join(data)).split(' ')))
dataset.append(pad)
vocab_size = len(dataset)
word_to_index = {w:i for i, w in enumerate(dataset)}
index_to_word = {i:w for i, w in enumerate(dataset)}

In [None]:
lines = []
for item in data:
  lines.append(item.split(' '))

max_line = max([len(line) for line in lines])

In [None]:
for item in lines:
  for _ in range(max_line - len(item)):
    item.append(pad)

In [None]:
X = []
for line in lines:
  item = [word_to_index[word] for word in line]
  X.append(item)

Y = []
for i in range(len(targets)):
  item = sentiment_to_index[targets[i]]
  Y.append(item)

In [None]:
class AttentionHead(nn.Module):
  def __init__(self, embedding_size, head_size):
    super(AttentionHead, self).__init__()
    self.head_size = head_size
    self.C = embedding_size

    self.q = nn.Linear(self.C, head_size, bias=False)
    self.v = nn.Linear(self.C, head_size, bias=False)
    self.k = nn.Linear(self.C, head_size, bias=False)

  def forward(self, x):
    B,T,C = x.shape
    q = self.q(x)
    k = self.k(x)
    v = self.v(x)

    wei = q @ k.transpose(-2, -1) * (self.head_size ** -0.5)
    wei = F.softmax(wei, dim=-1)

    return wei @ v

class EncoderMultiHeadAttention(nn.Module):
  def __init__(self, num_heads, embedding_size, head_size):
    super(EncoderMultiHeadAttention, self).__init__()
    self.num_heads = num_heads

    self.heads = nn.ModuleList([
        AttentionHead(embedding_size, head_size) for _ in range(num_heads)
    ])

    self.final_linear = nn.Linear(num_heads * head_size, embedding_size)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    head_outputs = [head(x) for head in self.heads]
    concatenated_output = torch.cat(head_outputs, dim=-1)
    final_output = self.final_linear(concatenated_output)
    return self.dropout(final_output)

class FeedForward(nn.Module):
  def __init__(self, embedding_size):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(embedding_size, 4 * embedding_size),
        nn.ReLU(),
        nn.Linear(4 * embedding_size, embedding_size),
        nn.Dropout(dropout),
    )
  def forward(self, x):
    return self.net(x)

class EncoderBlockAttention(nn.Module):
  def __init__(self, num_heads, embedding_size, head_size):
    super(EncoderBlockAttention, self).__init__()
    self.multiheads = EncoderMultiHeadAttention(num_heads, embedding_size, head_size)
    self.fw = FeedForward(embedding_size)
    self.norm1 = nn.LayerNorm(embedding_size)
    self.norm2 = nn.LayerNorm(embedding_size)

  def forward(self, x):
    inter_result = x + self.multiheads(self.norm1(x))
    final_result = x + self.fw(self.norm2(inter_result))
    return final_result

class Encoder(nn.Module):
  def __init__(self, num_heads, vocab_size, embedding_size, output_size, head_size):
    super(Encoder, self).__init__()
    self.em = nn.Embedding(vocab_size, embedding_size)
    self.pos_encode = nn.Embedding(seq_length, embedding_size)
    self.blocks = nn.ModuleList([EncoderBlockAttention(num_heads, embedding_size, head_size) for _ in range(4)])
    self.f_norm = nn.LayerNorm(embedding_size)
    self.fw = nn.Linear(embedding_size, output_size, bias=False)

  def forward(self, x):
    B,T = x.shape
    x_em = self.em(x)
    p_em = self.pos_encode(torch.arange(T))

    x = x_em + p_em
    for block in self.blocks:
      x = block(x)
    x = self.f_norm(x)
    x = self.fw(x)
    return torch.clone(x[:,T-1,:]).squeeze(1)

criterion = nn.CrossEntropyLoss()

model = Encoder(num_heads, vocab_size, embedding_dim, 3, head_size)
optimizer = optim.Adagrad(model.parameters(), lr=learning_rate)

In [None]:
def get_batch():
  inputs = []
  targets = []
  n = torch.randint(len(X) - batch_size, [1]).item()
  for i in range(batch_size):
    inputs_item = torch.tensor(X[n + i])
    targets_item = torch.tensor(Y[n + i])
    inputs.append(inputs_item)
    targets.append(targets_item)

  return torch.stack(inputs), torch.stack(targets)

In [None]:
on1 = torch.rand((3,4,5))
print(on1)
print(torch.clone(on1[:,3,:]).squeeze(1))

tensor([[[0.9479, 0.7091, 0.4131, 0.5498, 0.6462],
         [0.5467, 0.6809, 0.9910, 0.0813, 0.1717],
         [0.9783, 0.5267, 0.0978, 0.5695, 0.2302],
         [0.1460, 0.0410, 0.2111, 0.1425, 0.5241]],

        [[0.7104, 0.3916, 0.1105, 0.1457, 0.2464],
         [0.9301, 0.1063, 0.4606, 0.4025, 0.9448],
         [0.8151, 0.7580, 0.3681, 0.9758, 0.9931],
         [0.4659, 0.4371, 0.2802, 0.8284, 0.9607]],

        [[0.6110, 0.1068, 0.4580, 0.5537, 0.4714],
         [0.9458, 0.9476, 0.4988, 0.6737, 0.2731],
         [0.8914, 0.7820, 0.9837, 0.1294, 0.9168],
         [0.2468, 0.5949, 0.7835, 0.4914, 0.5822]]])
tensor([[0.1460, 0.0410, 0.2111, 0.1425, 0.5241],
        [0.4659, 0.4371, 0.2802, 0.8284, 0.9607],
        [0.2468, 0.5949, 0.7835, 0.4914, 0.5822]])


In [None]:
n_iters = 1000
for i in range(n_iters):
  inputs, targets = get_batch()

  predicts = model(inputs)

  optimizer.zero_grad(set_to_none=True)
  B,T = predicts.shape

  loss = criterion(predicts, targets)

  loss.backward()

  optimizer.step()

  if i%500 == 0:
    print(f'i {i}, loss:{loss.item()}')

In [None]:
hhh = "I have no feeling with this"

hgg = [word_to_index[word] for word in hhh.split(' ')]
for i in range(max_line - len(hgg)):
  hgg.append(word_to_index[pad])

# hgg = torch.stack(hgg)

hgg = torch.tensor(hgg).unsqueeze(0)

pred = model(hgg)
print(index_to_sentiment[torch.argmax(pred).item()])

neutral
