In [None]:
#Import drive
from google.colab import drive
#Mount Google Drive
ROOT="/content/drive"
drive.mount(ROOT, force_remount=True)

In [None]:
%pwd

In [None]:
%cd /content/drive/MyDrive/dlss24

In [None]:
%pwd

# Data

Download the corpora data from: https://codeocean.com/capsule/0078777/tree/v1

Uploaded to a location outside your gitHub (to do not be tracked)

You pwd should have the files:


1.   source_corpus.csv




# Module IV:  - Class 7 - Text Classifiers



# Reading data

In [None]:
import pandas as pd


In [None]:
df_source_corpus=pd.read_csv('/content/drive/MyDrive/dlss24/source_corpus.csv')

In [None]:
df_source_corpus = df_source_corpus.dropna(subset=['text'])

In [None]:
df_source_corpus.head()

# Word2vec

[Image of word2vec 1-hidden layer NN](https://becominghuman.ai/mathematical-introduction-to-glove-word-embedding-60f24154e54c)

Word2vec creates vectors that represent the context of words, while GloVe creates vectors that represent the co-occurrence of words.

Word2vec uses a shallow neural network to create vectors, while GloVe uses a global matrix factorization technique


Word2vec requires a large amount of training data, while GloVe can be trained on smaller datasets. This makes GloVe more suitable for smaller tasks, while Word2vec is better suited for larger applications.

In [None]:
# word2vec requires sentences as input
import nltk
nltk.download('stopwords')
nltk.download('punkt')
from string import punctuation
translator = str.maketrans('','',punctuation)
from nltk.corpus import stopwords
stoplist = set(stopwords.words('english'))
from nltk.stem import SnowballStemmer
stemmer = SnowballStemmer('english')

In [None]:
#from previous class
def normalize_text(doc):
    "Input doc and return clean list of tokens"
    doc = doc.replace('\r', ' ').replace('\n', ' ')
    lower = doc.lower() # all lower case
    nopunc = lower.translate(translator) # remove punctuation
    words = nopunc.split() # split into tokens
    nostop = [w for w in words if w not in stoplist] # remove stopwords
    no_numbers = [w if not w.isdigit() else '#' for w in nostop] # normalize numbers
    stemmed = [stemmer.stem(w) for w in no_numbers] # stem each word
    return stemmed



> 1. Apply the normalize_text function to the data (df_source_corpus)
2. Train word2vec on all of the data (All that is required is that the input yields one sentence (list of utf8 words) after another.)





> 1. Look for the word vectors for immigration, economy, security and rights
2. See which of these (economy, security and rights) is closer to immigration

Try w2v.wv.





> Analogies with the dataset:
1. Scientist is to man as __ is to woman
2. Scientist is to woman as __ is to man



# GLove





> Train Glove on your dataset and check most similar words to environment



In [None]:
!pip install glove-python3

In [None]:
import itertools
from glove import Corpus, Glove

In [None]:
corpus = Corpus()
corpus.fit(sentences_normalized, window=10)
glove = Glove(no_components=100, learning_rate=0.05)

In [None]:
glove.fit(corpus.matrix, epochs=30, no_threads=4, verbose=True)
glove.add_dictionary(corpus.dictionary)

In [None]:
glove.word_vectors[glove.dictionary['environment']].shape

In [None]:
glove.word_vectors[glove.dictionary['environment']]

In [None]:
glove.most_similar('environment')



# 2017: Attention is all you need



In [None]:
# Attention Basically
import numpy as np

class Node:

  def __init__(self):

    # the vector stored at this node (what is learned, the output of the layer)
    self.data=np.random.randn(20)

    # weights governing how this node interacts with other nodes
    self.wkey = np.random.randn(20, 20)
    self.wquery = np.random.randn(20, 20)
    self.wvalue = np.random.randn(20, 20)

  def key(self):
    # what do I have?
    return self.wkey @ self.data

  def query(self):
    # what am I looking for? (next work is the text shift by 1, classification is the class)
    return self.wquery @ self.data

  def value(self):
    # what do I oublicly reveal to others?
    return self.wvalue @ self.data


In [None]:
class Graph:

  def __init__(self):
    # make 10 nodes
    self.nodes=[Node() for _ in range(10)]
    #make 40 edges
    randi=lambda: np.random.randint(len(self.nodes))
    self.edges=[[randi(),randi()] for _ in range(40)]

  def run(self):

    updates=[]
    for i,n in enumerate(self.nodes):

      #what is the node looking for?
      q=n.query()

      #find all edges that are input to this node
      inputs = [self.nodes[ifrom] for (ifrom, ito) in self.edges if ito==i]
      if len(inputs)==0:
        continue #ignore, next in for loop
      # gather keys, i.e what they hold
      keys=[m.key() for m in inputs]
      #calculate compatibilities: dot product of  key with query
      scores=[k.dot(q) for k in keys]
      #softmax them so they sum to 1
      scores=np.exp(scores)
      scores=scores/np.sum(scores)
      # gather the appropriate values with weighted sum
      values=[m.value() for m in inputs]
      update=sum([s*v for s,v in zip(scores, values)])
      updates.append(update)

    for n,u in zip(self.nodes, updates):
      n.data=n.data +u #residual connection



> 1. Create a Graph
2. Print the key, value and query for the 4th node
3. Print the data before running and after running



# Transformers from scratch

[Documentation](https://uvadlc-notebooks.readthedocs.io/en/latest/tutorial_notebooks/tutorial6/Transformers_and_MHAttention.html)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [None]:
# Define the scaled dot-product function
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
    attn_logits = torch.matmul(q, k.transpose(-2, -1))
    attn_logits = attn_logits / math.sqrt(d_k)
    if mask is not None:
        attn_logits = attn_logits.masked_fill(mask == 0, -9e15)
    attention = F.softmax(attn_logits, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

# Define the expand mask function
def expand_mask(mask):
    assert mask.ndim >= 2, "Mask must be at least 2-dimensional with seq_length x seq_length"
    if mask.ndim == 3:
        mask = mask.unsqueeze(1)
    while mask.ndim < 4:
        mask = mask.unsqueeze(0)
    return mask

In [None]:
# Define the MultiheadAttention class
class MultiheadAttention(nn.Module):
    def __init__(self, input_dim, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be 0 modulo number of heads."
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.qkv_proj = nn.Linear(input_dim, 3 * embed_dim)
        self.o_proj = nn.Linear(embed_dim, embed_dim)
        self._reset_parameters()

    def _reset_parameters(self):
        nn.init.xavier_uniform_(self.qkv_proj.weight)
        self.qkv_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.o_proj.bias.data.fill_(0)

    def forward(self, x, mask=None, return_attention=False):
        batch_size, seq_length, _ = x.size()
        if mask is not None:
            mask = expand_mask(mask)
        qkv = self.qkv_proj(x)
        qkv = qkv.reshape(batch_size, seq_length, self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3)
        q, k, v = qkv.chunk(3, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask=mask)
        values = values.permute(0, 2, 1, 3).reshape(batch_size, seq_length, self.embed_dim)
        o = self.o_proj(values)
        if return_attention:
            return o, attention
        else:
            return o

# Define the EncoderBlock class
class EncoderBlock(nn.Module):
    def __init__(self, input_dim, num_heads, dim_feedforward, dropout=0.0):
        super().__init__()
        self.self_attn = MultiheadAttention(input_dim, input_dim, num_heads)
        self.linear_net = nn.Sequential(
            nn.Linear(input_dim, dim_feedforward),
            nn.Dropout(dropout),
            nn.ReLU(inplace=True),
            nn.Linear(dim_feedforward, input_dim)
        )
        self.norm1 = nn.LayerNorm(input_dim)
        self.norm2 = nn.LayerNorm(input_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out = self.self_attn(x, mask=mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)
        linear_out = self.linear_net(x)
        x = x + self.dropout(linear_out)
        x = self.norm2(x)
        return x

# Define the TransformerEncoder class
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, **block_args):
        super().__init__()
        self.layers = nn.ModuleList([EncoderBlock(**block_args) for _ in range(num_layers)])

    def forward(self, x, mask=None):
        for l in self.layers:
            x = l(x, mask=mask)
        return x

    def get_attention_maps(self, x, mask=None):
        attention_maps = []
        for l in self.layers:
            _, attn_map = l.self_attn(x, mask=mask, return_attention=True)
            attention_maps.append(attn_map)
            x = l(x)
        return attention_maps

# Define the PositionalEncoding class
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe, persistent=False)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x




In [None]:
# Define the Transformer-based text classifier
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, dim_feedforward, num_classes, max_len=5000, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.encoder = TransformerEncoder(
            num_layers=num_layers,
            input_dim=d_model,
            num_heads=num_heads,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )
        self.pre_classifier = nn.Linear(d_model, d_model)
        self.classifier = nn.Linear(d_model, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input_ids, mask=None):
        x = self.embedding(input_ids)
        x = self.positional_encoding(x)
        x = self.encoder(x, mask=mask)
        x = x[:, 0]  # Use the [CLS] token (first token) for classification
        x = F.relu(self.pre_classifier(x))
        x = self.dropout(x)
        logits = self.classifier(x)
        return logits


In [None]:

# Example usage
vocab_size = 30522  # Vocabulary size (BERT's vocab size)
d_model = 768  # Embedding size
num_heads = 12  # Number of attention heads
num_layers = 6  # Number of transformer layers
dim_feedforward = 3072  # Feedforward network hidden size
num_classes = 2  # Number of output classes (e.g., binary classification)
max_len = 512  # Maximum sequence length
dropout = 0.1  # Dropout rate

model = TransformerClassifier(vocab_size, d_model, num_heads, num_layers, dim_feedforward, num_classes, max_len, dropout)

# Assume we have a tokenizer that converts sentences to input_ids
# For demonstration, using random input_ids
input_ids = torch.randint(0, vocab_size, (1, max_len))  # Batch size 1, sequence length max_len
mask = (input_ids != 0).unsqueeze(1).unsqueeze(2)  # Mask for non-padding tokens

logits = model(input_ids, mask)
print(logits)

In [None]:
mask.shape

In [None]:
#batch size, seq length
input_ids.shape


#In the attention mechanism,
#the mask needs to be broadcastable to the shape [batch_size, num_heads, seq_length, seq_length]
#[batch_size, 1, seq_length, seq_length]: the num_heads will be matched in the code by the number of heads

# Transformers: Hugging Face

[Hugging Face](https://huggingface.co/docs/transformers/main_classes/tokenizer)

In [None]:
#!pip install transformers
import torch
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification

# gpu or cpu?
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print (device)

In [None]:
model_name = 'distilbert-base-uncased' # huggingface model_ID or path to folder
model = DistilBertForSequenceClassification.from_pretrained(model_name)
print (model)



> Apply that to our dataset





> Manifesto Berta

[Documentation](https://manifesto-project.wzb.eu/information/documents/manifestoberta)



In [None]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

model = AutoModelForSequenceClassification.from_pretrained("manifesto-project/manifestoberta-xlm-roberta-56policy-topics-sentence-2023-1-1")
tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large")

sentence = "We will restore funding to the Global Environment Facility and the Intergovernmental Panel on Climate Change, to support critical climate science research around the world"

inputs = tokenizer(sentence,
                   return_tensors="pt",
                   max_length=200,  #we limited the input to 200 tokens during finetuning
                   padding="max_length",
                   truncation=True
                   )

logits = model(**inputs).logits

probabilities = torch.softmax(logits, dim=1).tolist()[0]
probabilities = {model.config.id2label[index]: round(probability * 100, 2) for index, probability in enumerate(probabilities)}
probabilities = dict(sorted(probabilities.items(), key=lambda item: item[1], reverse=True))
print(probabilities)
# {'501 - Environmental Protection: Positive': 67.28, '411 - Technology and Infrastructure': 15.19, '107 - Internationalism: Positive': 13.63, '416 - Anti-Growth Economy: Positive': 2.02...

predicted_class = model.config.id2label[logits.argmax().item()]
print(predicted_class)
# 501 - Environmental Protection: Positive
