In [None]:

import argparse
import csv
import math
import os
import random
from collections import Counter, defaultdict

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split

# ######################################
# Utilities: dataset & vocab

class SimpleVocab:
    def __init__(self, min_freq=0, reserved_tokens=None):
        self.min_freq = min_freq
        self.freqs = Counter() # frequency counter for tokens
        self.token_to_idx = {}
        self.idx_to_token = []
        if reserved_tokens is None:
            reserved_tokens = ["<pad>", "<unk>", "<cls>"]
        for t in reserved_tokens:
            self.add_token(t)
        self.reserved_tokens = reserved_tokens

    def add_token(self, t): # Dynamic token adder
        if t in self.token_to_idx:
            return # already added , return nothing?
        idx = len(self.idx_to_token) # if new token
        self.token_to_idx[t] = idx # token to idx is dict
        self.idx_to_token.append(t) # list, although why do we need a list ``\../``

    def add_sentence(self, sent_tokens):
        self.freqs.update(sent_tokens)

    def build_vocab(self):
        # add tokens meeting min_freq
        for token, freq in self.freqs.most_common():
            if freq >= self.min_freq and token not in self.token_to_idx:
                self.add_token(token)

    def __len__(self):
        return len(self.idx_to_token)

    def encode(self, tokens):
        unk = self.token_to_idx.get("<unk>")
        return [self.token_to_idx.get(t, unk) for t in tokens] # return token ids, if token unknown then return unknown, get does is if t not in token_to_idx then return unk

In [None]:

def simple_tokenize(text):
    # basic lowercase whitespace tokenizer 
    text = text.lower()
   
    for ch in [",", ".", ";", ":", "!", "?", "(", ")", "\"", "'"]:
        text = text.replace(ch, " ")
    tokens = text.strip().split()
    return tokens

In [5]:
sample_text=simple_tokenize("Hello, world! This is a test.")
print(sample_text)

import pandas as pd
df=pd.read_csv("new_shape_dataset.csv")

['hello', 'world', 'this', 'is', 'a', 'test']


In [None]:
sample_text=simple_tokenize("Hello, world! This is a test.")
k=df["sentence"][:4]
k=list(k)
sent=""
for i in k:
    sent=simple_tokenize(i)
# k.rows = list(zip(df["sentence"].str.strip(), df["type"], df["size"], df["stiffness"]))


print(sample_text)

['this marble is giant and stiff', 'a soft round object was miniature', 'bulky and flexible describes the tube', 'the canister was solid and large']
['hello', 'world', 'this', 'is', 'a', 'test']


In [None]:
sentence,shape, size, stiff = df.columns[0], df.columns[1], df.columns[2], df.columns[3]

print(type(size), size, stiff)
print(df.columns[0])

<class 'str'> size stiffness
sentence


In [56]:
# example=list(df.loc[:5,"sentence"])
# print(example)
# vocab=SimpleVocab()
# for sent in example:
#     tokens = simple_tokenize(sent)
#     vocab.add_sentence(tokens)
#     vocab.add_sentence(sent)
# vocab.build_vocab()
# toks=example[0]
# j=vocab.encode(toks)

In [None]:
# print(len(vocab))

51


In [63]:
k=df["sentence"][:4].str.strip() # This line selects the first four rows of the "sentence" column, removes any leading or trailing whitespace, and assigns the result to the variable k.
# print(k)

k=[i for i in k]
print(k)
print(len(k))
type(df["type"][:4].values)

['this marble is giant and stiff', 'a soft round object was miniature', 'bulky and flexible describes the tube', 'the canister was solid and large']
4


numpy.ndarray

In [None]:

class MultiTaskTextDataset(Dataset):
    def __init__(self, csv_path, vocab=None, max_len=10, build_vocab=True): # max len is the contenxt size
        # read CSV
        self.df=pd.read_csv(csv_path)
        self.row,self.shape,self.size,self.stiff= self.df["sentence"],self.df["type"], self.df["size"], self.df["stiffness"]

        self.max_len = max_len
        self.vocab = vocab
        self.data=[]
   
        # build vocab if requested
        if vocab is None and build_vocab:
            self.vocab = SimpleVocab(min_freq=1)
            for text in self.row:
                toks = simple_tokenize(text)
                self.vocab.add_sentence(toks)
            self.vocab.build_vocab()
        elif vocab is None:
            raise ValueError("Provide vocab or set build_vocab=True")
        else:
            self.vocab = vocab

        # tokenize-> encode-> append values
        # self.data = []
        for text, shape, size, stiff in zip(self.row, self.shape, self.size, self.stiff):
            toks = simple_tokenize(text)
            enc = self.vocab.encode(toks)
            # Add CLS at start
            cls_idx = self.vocab.token_to_idx["<cls>"]
            enc = [cls_idx] + enc
            # pad/truncate
            if len(enc) < self.max_len:
                enc = enc + [self.vocab.token_to_idx["<pad>"]] * (self.max_len - len(enc))
            else:
                enc = enc[: self.max_len]
            # print(f"Encoded: {enc} shape: {shape} size: {size} stiffness: {stiff} ")
            try:
                self.data.append((
                    torch.tensor(enc, dtype=torch.long),
                    torch.tensor(shape, dtype=torch.long),
                    torch.tensor(size, dtype=torch.long),
                    torch.tensor(stiff, dtype=torch.long),
                ))
            except:
                pass

    def __len__(self):
        return len(self.data)
    
    def get_data(self):
        return self.data

    def __getitem__(self, idx):
        return self.data[idx]

In [82]:
df=pd.read_csv("new_shape_dataset.csv")







path = "new_shape_dataset.csv"
multi_dataset=MultiTaskTextDataset(path)

In [88]:
j=multi_dataset.get_data()[0]
print(type(j))

<class 'tuple'>


In [92]:
x = torch.tensor([1, 2, 3, 4])
print(x.shape)
y=torch.unsqueeze(x, 0)
print(y.shape)
z=torch.unsqueeze(x, 1)
print(z.shape)
print(z)

torch.Size([4])
torch.Size([1, 4])
torch.Size([4, 1])
tensor([[1],
        [2],
        [3],
        [4]])


In [None]:


# Model: PosEncoding
# -------------------------
class PositionalEncoding(nn.Module): 
    def __init__(self, d_model, max_len=512): # max_len is what exactly -> context size?
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()  # (max_len,1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer("pe", pe) # what does register buffer do
        # used to register a buffer that should not be considered a model parameter
        # persistant is across batches meaning, non persistant means it wont be a part of state_dict 

    def forward(self, x):
        # x: (batch, seq_len, d_model)
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len]
        return x


In [95]:

class VanillaTransformerMultiTask(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers, hidden_dim,
                 num_shape, num_size, num_stiff, max_len=32, dropout=0.1):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model, padding_idx=0) # what is padding_idx?
        # This module is often used to store word embeddings and retrieve them using indices. 
        # The input to the module is a list of indices, and the output is the corresponding word embeddings.
        # padding_idx do not contribute to the gradient; therefore, the embedding vector at padding_idx is not updated during training, i
        self.pos_enc = PositionalEncoding(d_model, max_len=max_len)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                                   dim_feedforward=hidden_dim,
                                                   dropout=dropout, activation="relu")
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        # pooling: use token 0 (we placed <cls> at index 0) as pooled
        self.dense = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

        # output heads
        self.head_shape = nn.Linear(d_model, num_shape)
        self.head_size = nn.Linear(d_model, num_size)
        self.head_stiff = nn.Linear(d_model, num_stiff)

        self._init_weights()

    def _init_weights(self):

        #  nn.init.xavier_uniform_ sets the weights so that the variance is the same across layers, which helps with stable training.       
        # how stable? i dont know
        nn.init.xavier_uniform_(self.token_emb.weight)
        nn.init.xavier_uniform_(self.dense.weight)
        nn.init.xavier_uniform_(self.head_shape.weight)
        nn.init.xavier_uniform_(self.head_size.weight)
        nn.init.xavier_uniform_(self.head_stiff.weight)

    def forward(self, input_ids, src_key_padding_mask=None):
        # what is src_key_padding?
        # input_ids: (batch, seq_len)
        x = self.token_emb(input_ids)  # (batch, seq_len, d_model)
        x = self.pos_enc(x)  # x: (batch, seq_len, d_model)
        # transformer expects (seq_len, batch, d_model)
        x = x.transpose(0, 1)
        # src_key_padding_mask: (batch, seq_len) boolean: True for padded positions
        enc = self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)  # (seq_len, batch, d_model)
        enc = enc.transpose(0, 1)  # (batch, seq_len, d_model)
        cls_token = enc[:, 0, :]  # (batch, d_model)
        pooled = torch.tanh(self.dense(cls_token))
        pooled = self.dropout(pooled)
        out_w = self.head_wealth(pooled)
        out_b = self.head_body(pooled)
        out_g = self.head_gender(pooled)
        return out_w, out_b, out_g



In [None]:

# -------------------------
# Training & Eval helpers
# -------------------------

# what does it do 
def collate_batch(batch):
    # batch: list of tuples: (enc, w, b, g)
    encs = torch.stack([b[0] for b in batch], dim=0)
    w = torch.stack([b[1] for b in batch], dim=0)
    bod = torch.stack([b[2] for b in batch], dim=0)
    g = torch.stack([b[3] for b in batch], dim=0)
    # padding mask: True where pad token (token idx 0) exists
    pad_mask = encs == 0
    return encs, pad_mask, w, bod, g



def accuracy(preds, labels):
    return (preds.argmax(dim=1) == labels).float().mean().item()


In [97]:

def train_epoch(model, dataloader, optim, device):
    model.train()
    total_loss = 0.0
    total_acc_w = 0.0
    total_acc_b = 0.0
    total_acc_g = 0.0
    criterion = nn.CrossEntropyLoss()
    for encs, pad_mask, w, bod, g in dataloader:
        encs = encs.to(device)
        pad_mask = pad_mask.to(device)
        w = w.to(device)
        bod = bod.to(device)
        g = g.to(device)

        optim.zero_grad()
        out_w, out_b, out_g = model(encs, src_key_padding_mask=pad_mask)
        loss_w = criterion(out_w, w)
        loss_b = criterion(out_b, bod)
        loss_g = criterion(out_g, g)
        loss = loss_w + loss_b + loss_g
        loss.backward()
        optim.step()
        total_loss += loss.item() * encs.size(0)
        total_acc_w += accuracy(out_w.detach().cpu(), w.detach().cpu()) * encs.size(0)
        total_acc_b += accuracy(out_b.detach().cpu(), bod.detach().cpu()) * encs.size(0)
        total_acc_g += accuracy(out_g.detach().cpu(), g.detach().cpu()) * encs.size(0)

    n = len(dataloader.dataset)
    return total_loss / n, total_acc_w / n, total_acc_b / n, total_acc_g / n