In [5]:
import torch
import torch.nn.functional as F
from torch.utils.checkpoint import checkpoint
import pandas as pd
from collections import Counter
from copy import deepcopy
import re
from typing import Callable, Iterable, Optional

In [6]:
class SpecialToken:
    """
    A special token for tokenizers
    """

    def __init__(self, string: str):
        self.string = string.upper()

    def __repr__(self):
        return f"<{self.string}>"
    
    def __eq__(self, other) -> bool:
        if isinstance(other, SpecialToken):
            return self.string == other.string
        else:
            return False
        
    def __hash__(self):
        return hash(self.string)


class Tokenizer:
    """
    A simple word tokenizer
    """

    word_pattern = re.compile(R"\w+|\d+|[^\w\d\s]")

    def __repr__(self):
        return f"Tokenizer({len(self.vocabulary)} tokens)"

    def __init__(self, corpus: Iterable[str], min_frequency: float = 1.0E-6):
        words = [word for document in corpus for word in self._split(document)]
        word_counts = Counter(words)
        word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
        self.vocabulary = [k for k, v in word_counts if v/len(words) >= min_frequency] + [SpecialToken("UNKNOWN"), SpecialToken("END"), SpecialToken("PAD")]
        self.map = {word: i for i, word in enumerate(self.vocabulary)}

    def split(self, document: str) -> list[str]:
        return self.decode(self.encode(document))

    def encode(self, document: str) -> list[int]:
        return [self.map.get(word, self.UNKNOWN) for word in self._split(document)]

    def decode(self, encoded: list[int]) -> list[str]:
        return [i.string if isinstance(i, SpecialToken) else self.vocabulary[i] for i in encoded]

    def _split(self, document: str) -> list[str]:
        return self.word_pattern.findall(document)

    @property
    def PAD(self) -> SpecialToken:
        return self.map[SpecialToken("PAD")]
    
    @property
    def END(self) -> SpecialToken:
        return self.map[SpecialToken("END")]
    
    @property
    def UNKNOWN(self) -> SpecialToken:
        return self.map[SpecialToken("UNKNOWN")]


In [7]:
df = pd.read_csv("../datasets/Twitter_US_Airline_Sentiment.csv")
labels = ['negative', 'neutral', 'positive']
df.columns

Index(['text', 'airline_sentiment'], dtype='object')

In [8]:
tokenizer = Tokenizer(df.text, min_frequency=1.0E-5)
print(tokenizer)

Tokenizer(4314 tokens)


In [9]:
for text in df.sample(n=10).text:
    print(text)
    print(tokenizer.split(text))
    print()


@USAirways @AmericanAir terminal E in Miami is still the worst most smelly airport ever. Thanks for nothing.
['@', 'USAirways', '@', 'AmericanAir', 'terminal', 'E', 'in', 'Miami', 'is', 'still', 'the', 'worst', 'most', <UNKNOWN>, 'airport', 'ever', '.', 'Thanks', 'for', 'nothing', '.']

@united Arriving 25 minutes early is nice, but not if equipment isn't ready. Waiting 30 minutes for luggage, so far. Time gains wiped out.
['@', 'united', <UNKNOWN>, '25', 'minutes', 'early', 'is', 'nice', ',', 'but', 'not', 'if', 'equipment', 'isn', "'", 't', 'ready', '.', 'Waiting', '30', 'minutes', 'for', 'luggage', ',', 'so', 'far', '.', 'Time', <UNKNOWN>, <UNKNOWN>, 'out', '.']

@SouthwestAir grouchy about this flight 636 #complimentarybeveragesneeded
['@', 'SouthwestAir', <UNKNOWN>, 'about', 'this', 'flight', <UNKNOWN>, '#', <UNKNOWN>]

@JetBlue 2 aisles of empty #evermoreroom seats and we can't move bc we didn't pay?! #nonsense #Waste #JetBlue #jetbluebos #cheap
['@', 'JetBlue', '2', <UNKNOWN>, '

In [10]:
df_train = df.sample(frac=0.7)
df = df.drop(index=df_train.index)
df_val = df.sample(frac=0.5)
df_test = df.drop(index=df_val.index)

In [11]:
def accuracy(predicted: torch.Tensor, target: torch.Tensor):
    assert target.shape == predicted.shape
    assert target.dtype == torch.long
    assert predicted.dtype == torch.long
    with torch.no_grad():
        return torch.mean((predicted == target).float()).cpu().item()


def input_to_tensor(df: pd.DataFrame, tokenizer: Tokenizer) -> torch.Tensor:
    encoded = [tokenizer.encode(document) for document in df['text']]
    L = max([len(doc) for doc in encoded])
    encoded = [doc + [tokenizer.PAD]*(L - len(doc)) for doc in encoded]
    return torch.tensor(encoded, dtype=torch.long)


def target_to_tensor(df: pd.DataFrame) -> torch.Tensor:
    map = {k: v for v, k in enumerate(labels)}
    return torch.tensor([map[label] for label in df["airline_sentiment"]], dtype=torch.long)


def data_to_tensor(df: pd.DataFrame, tokenizer: Tokenizer) -> tuple[torch.Tensor]:
    return (input_to_tensor(df, tokenizer), target_to_tensor(df))

In [12]:
class Batchifyer:

    def __init__(self, df: pd.DataFrame, tokenizer: Tokenizer, n_batches: int, batch_size: Optional[int]):
        self.df = df
        self.tokenizer = tokenizer
        self.n_batches = n_batches
        self.batch_size = batch_size
    
    def __iter__(self):
        shuffled = df.sample(frac=1.)
        return (self._batch(shuffled, i) for i in range(self.n_batches))
    
    def _batch(self, shuffled: pd.DataFrame, i: int) -> tuple[torch.Tensor, torch.Tensor]:
        batch_size = self.batch_size or len(shuffled) // self.n_batches
        subset = shuffled.iloc[i*batch_size:(i+1)*batch_size]
        return data_to_tensor(subset, self.tokenizer)

In [13]:
def train_loop(model: torch.nn.Module, optimizer: torch.optim.Optimizer, train_data: Iterable[tuple[torch.Tensor]], val_data: Iterable[tuple[torch.Tensor]], n_steps: int = 1000, patience: int = 100, keep_best: bool = True):
    """
    train the model for the specified number of steps, or untilearly stopping
    """
    best_state = deepcopy(model.state_dict())
    best_step = 0
    best_metric = 0.
    try:
        for step in range(n_steps):
            optimizer.zero_grad()
            # train loss
            model.train()
            losses = []
            for x, y in train_data:
                loss = model.loss(x, y)
                loss.backward()
                losses.append(loss.item())
            loss = sum(losses)/len(losses)
            # val metric
            model.eval()
            metrics = []
            for x, y in val_data:
                metrics.append(model.metric(x, y))
            metric = sum(metrics) / len(metrics)
            # checkpointing
            if metric > best_metric:
                best_metric = metric
                best_step = step
                if keep_best:
                    best_state = deepcopy(model.state_dict())
            elif step - best_step > patience:
                print("early stoping")
                break
            # optimizer steping
            optimizer.step()
            # printing
            print(f"Step {step}: loss = {loss:.3g} metric = {metric:.2%}")

    except KeyboardInterrupt:
        print("interrupted by user")
    if keep_best:
        model.load_state_dict(best_state)

## Exercice 1

Implémenter et entraîner un réseau récurrent pour classifier les tweets

In [14]:
class RNN(torch.nn.Module):
    
    def __init__(self, n_classes: int, tokenizer: Tokenizer, in_features: int, hidden_state_features: int, activation: Callable = torch.relu):
        super().__init__()
        self.tokenizer = tokenizer
        self.hidden_state_features = hidden_state_features
        self.embedding = torch.nn.Embedding(len(tokenizer.vocabulary), in_features)
        self.linear = torch.nn.Linear(in_features + hidden_state_features, in_features + hidden_state_features)
        self.activation = activation
        self.contract = torch.nn.Linear(in_features + hidden_state_features, hidden_state_features)
        self.normalization = torch.nn.LayerNorm(hidden_state_features)
        self.output = torch.nn.Linear(hidden_state_features, n_classes)
    
    def forward(self, X):
        """
        Parameters
        ----------

        X : torch.Tensor
            tensor of long of shape (N, L)
        """
        X = X.to(self.device)
        N, L = X.shape
        H = torch.zeros((N, self.hidden_state_features), dtype=torch.float32, device=X.device)
        for x in X.transpose(0, 1):
            I = self.embedding(x)
            T = torch.cat([I, H], dim=1)
            T = self.linear(T)
            T = self.activation(T)
            T = self.contract(T)
            H = torch.where(x.unsqueeze(1) == self.tokenizer.PAD, H, T)
        return self.output(H)
    
    def predict(self, X: torch.Tensor) -> torch.Tensor:
        self.eval()
        with torch.no_grad():
            Y = self(X)
        return Y.max(dim=1).indices
    
    def loss(self, X: torch.Tensor, Y: torch.Tensor) -> torch.Tensor:
        y_pred = self(X)
        return F.cross_entropy(y_pred, Y.to(y_pred.device))

    def metric(self, X: torch.Tensor, Y: torch.Tensor) -> torch.Tensor:
        y_pred = self.predict(X)
        return accuracy(y_pred, Y.to(y_pred.device))

    @property
    def device(self) -> torch.device:
        return self.output.weight.device

In [15]:
model = RNN(len(labels), tokenizer, 100, 100)
model.to("cuda:0")
optimizer = torch.optim.Adam(model.parameters(), lr=1.0E-3)
train = Batchifyer(df_train, tokenizer, n_batches=1, batch_size=None)
val = Batchifyer(df_val, tokenizer, n_batches=1, batch_size=None)
train_loop(model, optimizer, train, val, n_steps=1000, patience=100)

Step 0: loss = 1.03 metric = 62.61%
Step 1: loss = 0.969 metric = 63.09%
Step 2: loss = 0.921 metric = 63.09%
Step 3: loss = 0.893 metric = 63.09%
Step 4: loss = 0.886 metric = 63.09%
Step 5: loss = 0.889 metric = 63.09%
Step 6: loss = 0.886 metric = 63.09%
Step 7: loss = 0.877 metric = 63.09%
interrupted by user


In [16]:
X, Y = data_to_tensor(df_test, tokenizer)
y_pred = model.predict(X)
acc = accuracy(y_pred, Y.to(y_pred.device))
print(f"accuracy {acc:.3%}")

accuracy 61.840%


## Exercice II

Programmer un modèle type encodeur de transformeur pour classifier les tweets

In [17]:
class AttentionBlock(torch.nn.Module):

    def __init__(self, projection_dim: int, n_heads: int, activation: Callable):
        super().__init__()
        self.projection_dim = projection_dim
        self.n_heads = n_heads
        D = projection_dim * n_heads
        self.q = torch.nn.Linear(D, D, bias=False)
        self.k = torch.nn.Linear(D, D, bias=False)
        self.v = torch.nn.Linear(D, D, bias=False)
        self.intermediate_norm = torch.nn.LayerNorm(D)
        self.expand = torch.nn.Linear(D, 4*D)
        self.activation = activation
        self.contract = torch.nn.LayerNorm(4*D, D)
    
    def forward(self, X: torch.Tensor, mask: torch.Tensor):
        """
        Parameters
        ----------

        X : torch.Tensor
            tensor of shape (N, L, D)
        """
        super().__init__()
        input = X
        N, L, D = X.shape
        X = X.reshape(-1, D)
        Q = self.q(X).reshape(N, L, self.n_heads, self.projection_dim).permute(0, 2, 1, 3)
        K = self.k(X).reshape(N, L, self.n_heads, self.projection_dim).permute(0, 2, 1, 3)
        V = self.v(X).reshape(N, L, self.n_heads, self.projection_dim).permute(0, 2, 1, 3)
        S = torch.einsum("nhld, nhkd -> nhlk", Q, K)
        S = torch.masked_fill(S, mask.reshape(1, 1, L, L), -float("inf"))
        X = (S @ V).permute(0, 2, 1, 3).reshape(N, L, D)
        X = self.intermediate_norm((X + input).reshape(-1, D)).reshape(N, L, D)
        intermediate = X
        X = self.expand(X)
        X = self.activation(X)
        X = self.contract(X)
        X = self.norm((X + intermediate).reshape(-1, D)).reshape(N, L, D)
        return X

In [18]:
class Transformer(torch.nn.Module):

    def __init__(self, n_classes: int, tokenizer: Tokenizer, n_stages: int, projection_dim: int, n_heads: int, activation: Callable = torch.relu):
        super().__init__()
        self.tokenizer = tokenizer
        self.embedding = torch.nn.Embedding(len(tokenizer.vocabulary), n_heads*projection_dim)
        self.stages = torch.nn.ModuleList()
        for _ in range(n_stages):
            self.stages.append(AttentionBlock(projection_dim, n_heads, activation))
        self.output = torch.nn.Linear(projection_dim * n_heads, n_classes)
    
    def forward(self, X):
        X = X.to(self.device)
        mask = (X == self.tokenizer.PAD)
        mask = (mask.unsqueeze(1) | mask.unsqueeze(0))
        X = self.embedding(X)
        for stage in self.stages:
            X = stage(X, mask)
            # if self.training:
            #     print(stage.q)
            #     X = checkpoint(stage, X, mask)
            # else:
            #     X = stage(X, mask)
        return self.output(X)
    
    def predict(self, X: torch.Tensor) -> torch.Tensor:
        self.eval()
        with torch.no_grad():
            Y = self(X)
        return Y.max(dim=1).indices
    
    def loss(self, X: torch.Tensor, Y: torch.Tensor) -> torch.Tensor:
        y_pred = self(X)
        return F.cross_entropy(y_pred, Y.to(y_pred.device))

    def metric(self, X: torch.Tensor, Y: torch.Tensor) -> torch.Tensor:
        y_pred = self.predict(X)
        return accuracy(y_pred, Y.to(y_pred.device))

    @property
    def device(self) -> torch.device:
        return self.output.weight.device

In [20]:
model = Transformer(len(labels), tokenizer, 4, 16, 8)
model.to("cuda:0")
optimizer = torch.optim.Adam(model.parameters(), lr=1.0E-3)
train = Batchifyer(df_train, tokenizer, n_batches=1, batch_size=None)
val = Batchifyer(df_val, tokenizer, n_batches=1, batch_size=None)
train_loop(model, optimizer, train, val, n_steps=1000, patience=100)

AttributeError: 'AttentionBlock' object has no attribute 'q'

In [None]:
model