In [None]:
#

# ``spaCy pipeline`` components

### **tok2vec** (Token-to-Vector)
- `add_label(label)`: Adds a label to the tok2vec model.
- `create_optimizer()`: Creates an optimizer for training the model.
- `from_bytes(bytes_data)`: Loads the component from a binary format.
- `from_disk(path)`: Loads the component from a directory.
- `initialize()`: Initializes the model for training.
- `model`: The underlying neural network model used for token-to-vector transformation.
- `predict(docs)`: Predicts token representations for input documents.
- `update(docs, golds)`: Updates the model using labeled data.
- `to_bytes()`: Serializes the component to a binary format.
- `to_disk(path)`: Saves the component to a directory.

### **tagger** (Part-of-Speech Tagger)
- `add_label(label)`: Adds a new POS tag label.
- `create_optimizer()`: Creates an optimizer for training.
- `from_bytes(bytes_data)`: Loads the tagger from a binary format.
- `from_disk(path)`: Loads the tagger from a directory.
- `initialize()`: Initializes the tagger with training data.
- `labels`: The list of POS labels the tagger recognizes.
- `predict(docs)`: Predicts POS tags for input documents.
- `update(docs, golds)`: Updates the tagger model with labeled data.
- `to_bytes()`: Serializes the tagger to a binary format.
- `to_disk(path)`: Saves the tagger model to disk.

### **parser** (Dependency Parser)
- `add_label(label)`: Adds a dependency label.
- `beam_parse(docs)`: Performs dependency parsing using beam search.
- `greedy_parse(docs)`: Performs dependency parsing using greedy decoding.
- `initialize()`: Initializes the parser with training data.
- `labels`: The list of dependency labels used by the parser.
- `predict(docs)`: Predicts dependency structures for input documents.
- `update(docs, golds)`: Updates the parser model using labeled examples.
- `to_bytes()`: Serializes the parser to a binary format.
- `to_disk(path)`: Saves the parser to a directory.

### **attribute_ruler** (Rule-based Attribute Modification)
- `add(name, value)`: Adds an attribute modification rule.
- `add_patterns(patterns)`: Adds multiple attribute modification patterns.
- `from_bytes(bytes_data)`: Loads attribute rules from a binary format.
- `from_disk(path)`: Loads attribute rules from a directory.
- `match(docs)`: Matches input documents against stored attribute rules.
- `patterns`: The list of patterns used for attribute modification.
- `pipe(docs)`: Applies attribute rules in a pipeline.
- `to_bytes()`: Serializes the attribute ruler to a binary format.
- `to_disk(path)`: Saves attribute rules to disk.

### **lemmatizer** (Word Lemmatization)
- `initialize()`: Initializes the lemmatizer with language-specific rules.
- `is_base_form(word)`: Checks if a word is already in its base form.
- `lemmatize(token)`: Returns the lemma of a given token.
- `lookup_lemmatize(token)`: Uses a lookup table for lemmatization.
- `rule_lemmatize(token)`: Uses predefined rules for lemmatization.
- `mode`: Specifies the lemmatization mode (rule-based or lookup).
- `to_bytes()`: Serializes the lemmatizer to a binary format.
- `to_disk(path)`: Saves the lemmatizer to a directory.

### **ner** (Named Entity Recognizer)
- `add_label(label)`: Adds a new entity label.
- `beam_parse(docs)`: Performs named entity recognition using beam search.
- `greedy_parse(docs)`: Performs named entity recognition using greedy decoding.
- `initialize()`: Initializes the NER model with training data.
- `labels`: The list of named entity labels recognized by the model.
- `predict(docs)`: Predicts named entities in input documents.
- `update(docs, golds)`: Updates the NER model using labeled data.
- `to_bytes()`: Serializes the NER model to a binary format.
- `to_disk(path)`: Saves the NER model to a directory.

This provides a concise overview of key methods and attributes for each component in spaCy's pipeline. 🚀

## `spaCy Token` attributes and methods:  

- **`text`**: The original token text.  
- **`lemma_`**: The base form of the token.  
- **`pos_`**: The part of speech (POS) tag.  
- **`tag_`**: The detailed POS tag.  
- **`dep_`**: The syntactic dependency relation of the token.  
- **`head`**: The token's syntactic head (governing word).  
- **`ent_type_`**: The named entity type (if applicable).  
- **`is_alpha`, `is_punct`, `is_space`, `is_stop`**: Boolean flags for character type.
- **`vector`**: The word embedding vector representation.  
- **`similarity()`**: Computes similarity between two tokens.  
- **`sent`**: Returns the sentence the token belongs to.  
- **`i`**: The index of the token in the document.  
- **`idx`**: The character index of the token in the original text.  
- **`morph`**: Morphological features of the token.  
- **`shape_`**: The shape of the token (e.g., "Xx" for "Apple").  
- **`lefts` / `rights`**: Iterators over the token’s left/right children in the dependency tree.  
- **`subtree`**: An iterator over all descendant tokens.  



  

### `spaCy Doc` Core Attributes & Methods

- **`text`**: The full text of the document.  
- **`text_with_ws`**: The document text, preserving original whitespace.  
- **`ents`**: A list of named entities in the document.  
- **`sents`**: An iterator over the sentences in the document.  
- **`noun_chunks`**: A list of noun phrases (NPs) in the document.  
- **`vector`**: The document-level word embedding.  
- **`vector_norm`**: The L2 norm of the document’s word embedding.  
- **`similarity()`**: Computes semantic similarity between two `Doc` objects.  

### **Tokenization & Parsing**  
- **`doc[i]`**: Retrieves the `i`-th token in the document.  
- **`from_array()` / `to_array() , from_dict()` / `to_dict()`**: Converts a document to/from a NumPy array/Python dictionary.  
- **`from_bytes()` / `to_bytes()` , `from_json()` / `to_json()`**: Serializes/deserializes a document in binary/JSON format.      
- **`from_disk()` / `to_disk()`**: Loads/saves a document from/to disk.  

### **Linguistic Analysis**  
- **`has_vector`**: Checks if any token in the document has a word vector.  
- **`is_parsed`**: Checks if the document has been syntactically parsed.  
- **`is_tagged`**: Checks if the document has been POS-tagged.  
- **`is_sentenced`**: Checks if sentence boundaries are defined.  
- **`count_by()`**: Counts token attributes (e.g., POS tags, dependency labels).  
- **`get_lca_matrix()`**: Returns the Lowest Common Ancestor (LCA) matrix for dependency parsing.  

### **Customization & Extensions**  
- **`set_extension()` / `get_extension()`**: Defines and retrieves custom attributes.  
- **`remove_extension()`**: Removes a custom extension.  
- **`user_data`**: A dictionary for storing user-defined metadata.  

### **Token Merging & Retokenization**  
- **`retokenize()`**: Allows modifying token boundaries (e.g., merging tokens).  


## Phrase matcher

In [None]:
from spacy.lang.en import English
from spacy.matcher import PhraseMatcher

nlp = English()
matcher = PhraseMatcher(nlp.vocab,attr="SHAPE")

terms = ["Barack Obama", "Angela Merkel", "Washington, D.C.","Washington"]
matcher.add("Terminology", patterns)
matcher.add("IPaddress", [nlp("127.0.0.1"), nlp("127.127.0.0")])


text = """German Chancellor Angela Merkel and US President Barack Obama converse in the Oval Office inside the
White House in Washington, D.C. Often their router will have an IP address such as 192.168.1.1 or 192.168.2.1.
Lee, an experienced CEO, has founded two AI startups."""
doc = nlp(text)

for match_id, start, end in matcher(doc):
    span = doc[start:end]
    match_name = nlp.vocab.strings[match_id]  # name of the matcher
    print(f"{span.text} (Matched by: {match_name})")


German Chancellor (Matched by: Terminology)
Chancellor Angela (Matched by: Terminology)
Angela Merkel (Matched by: Terminology)
President Barack (Matched by: Terminology)
Barack Obama (Matched by: Terminology)
White House (Matched by: Terminology)
Washington, D.C. (Matched by: Terminology)
192.168.1.1 (Matched by: IPaddress)
192.168.2.1 (Matched by: IPaddress)


## Dependency matcher

 `spacy.matcher` module to create a matcher that works based on dependency relations between words in a sentence.

Here's a breakdown of the keys in the `DependencyMatcher` pattern:

* **`RIGHT_ID`**:  A unique identifier for this part of the pattern.  Used to link different parts of the pattern together.

* **`RIGHT_ATTRS`**:  Conditions that the token being matched must satisfy.  Here, it specifies the word form (`ORTH`) or dependency relation (`DEP`).

* **`LEFT_ID`**: Refers back to the `RIGHT_ID` of a previously defined part of the pattern. This establishes the relationships between tokens.

* **`REL_OP`**: Specifies the relationship between the "left" and "right" tokens. `>` means "directly governed by" (a direct dependency relation).

* **`RIGHT_ATTRS: {"DEP": "nsubj"}`**: The token must have the dependency relation "nsubj" (nominal subject).

* **`RIGHT_ATTRS: {"DEP": "dobj"}`**: The token must have the dependency relation "dobj" (direct object).

* **`RIGHT_ATTRS: {"DEP": {"IN": ["amod", "compound"]}}`**: The token's dependency relation must be *either* "amod" (adjectival modifier) *or* "compound" (part of a compound noun).  `{"IN": [...]}` means the value must be one of the items in the list.


**Key Points:**

- The dependency matcher is a powerful tool for finding specific syntactic structures in text.
- The `REL_OP` and `RIGHT_ATTRS` are used to define the relationships between tokens and their attributes.
- Token indices are used to identify the matched tokens within the document.



In [None]:
from spacy.matcher import DependencyMatcher

matcher = DependencyMatcher(nlp.vocab)
pattern = [
    {
        "RIGHT_ID": "anchor_founded",
        "RIGHT_ATTRS": {"ORTH": "founded"}
    },
    {
        "LEFT_ID": "anchor_founded",
        "REL_OP": ">",
        "RIGHT_ID": "founded_subject",
        "RIGHT_ATTRS": {"DEP": "nsubj"},
    },
    {
        "LEFT_ID": "anchor_founded",
        "REL_OP": ">",
        "RIGHT_ID": "founded_object",
        "RIGHT_ATTRS": {"DEP": "dobj"},
    },
    {
        "LEFT_ID": "founded_object",
        "REL_OP": ">",
        "RIGHT_ID": "founded_object_modifier",
        "RIGHT_ATTRS": {"DEP": {"IN": ["amod", "compound"]}},
    }
]

matcher.add("FOUNDED", [pattern])
doc = nlp("Lee, an experienced CEO, has founded two AI startups.")
matches = matcher(doc)

print(matches) # [(4851363122962674176, [6, 0, 10, 9])]
# Each token_id corresponds to one pattern dict
match_id, token_ids = matches[0]
for i in range(len(token_ids)):
    print(pattern[i]["RIGHT_ID"] + ":", doc[token_ids[i]].text)

[(4851363122962674176, [7, 0, 10, 9])]
anchor_founded: founded
founded_subject: Lee
founded_object: startups
founded_object_modifier: AI


The `EntityRuler` is applied *after* the default entity recognizer. This means you can use it to:

- **Add new entities:**  Identify entities that the default model might miss.
- **Override existing entities:** Correct misclassified entities or change the entity type.
- **Add specific entity variations:** Handle different spellings, abbreviations, or forms of the same entity (as with "San Francisco" and "San Fran").


**How the EntityRuler Works:**

The `EntityRuler` works by matching the specified patterns against the text.  If a match is found, it adds or overwrites the entity annotation for the matched span. In your example, the ruler will:

- Find "Apple" and label it as "ORG".
- Find "San Francisco" (and "San Fran") and label it as "GPE".
- Find "MyCorp Inc." and label it as "ORG".




In [None]:

nlp = spacy.load("en_core_web_sm")
ruler = nlp.add_pipe("entity_ruler")

patterns = [{"label": "ORG", "pattern": "Apple", "id": "apple"},
            {"label": "GPE", "pattern": [{"LOWER": "san"}, {"LOWER": "francisco"}], "id": "san-francisco"},
            {"label": "GPE", "pattern": [{"LOWER": "san"}, {"LOWER": "fran"}], "id": "san-francisco"},
            {"label": "ORG", "pattern": "MyCorp Inc.","id": "mycorp"}]
ruler.add_patterns(patterns)

text = "Apple is opening its first big office in San Francisco. MyCorp Inc. is a company in the U.S."
doc = nlp(text)
for ent in doc.ents:
    print([ent.text, ent.label])


['Apple', 383]
['first', 396]
['San Francisco', 384]
['MyCorp Inc.', 383]
['U.S.', 384]



# Paraphrase-Identification
https://github.com/wasiahmad/paraphrase_identification

In [None]:
import spacy
from spacy.matcher import Matcher

nlp = spacy.load("en_core_web_sm")
matcher = Matcher(nlp.vocab)

patterns = [[{'POS':'ADJ'}, {'POS':'NOUN'}],]
matcher.add("noun_adj", patterns)


# Function to tokenize sentence into phrases
def extract_phrases(sentence):
    doc = nlp(sentence)
    matches = matcher(doc)
    phrases = []
    for match_id,start,end in matches:
      span = doc[start:end]
      phrases.append(span.text)
    phrases.extend(chunk.text for chunk in doc.noun_chunks)
    return phrases


text = """The cat sat on the mat and licked its paws.A dog chased the ball and barked loudly.
The sun set behind the mountains, casting a golden glow."""

for sentence in text.split("."):
    sentence = sentence.replace("\n","")
    if sentence:
        phrases = extract_phrases(sentence)
        print(f"{sentence} - {phrases}")

The cat sat on the mat and licked its paws - ['The cat', 'the mat', 'its paws']
A dog chased the ball and barked loudly - ['A dog', 'the ball']
The sun set behind the mountains, casting a golden glow - ['golden glow', 'The sun', 'the mountains', 'a golden glow']


### Noun,Verb & Prepositional phrases

In [None]:
import spacy
nlp = spacy.load("en_core_web_sm")

text = "The quick brown fox jumps over the lazy dog."

doc = nlp(text)

# Noun chunking
noun_phrases = []
for chunk in doc.noun_chunks:
    noun_phrases.append(chunk.text)

print("Noun Phrases:", noun_phrases)
print("-"*100)

# Verb phrases
verb_phrases = []
for token in doc:
    if token.pos_ == "VERB":
        verb_phrase = token.text
        for child in token.children:
            if child.dep_ in ["aux", "auxpass", "advmod", "prt"]:
                verb_phrase += " " + child.text
        verb_phrases.append(verb_phrase)


print("Verb Phrases:", verb_phrases)
print("-"*100)

# Prepositional phrases
prepositional_phrases = []
for token in doc:
    if token.pos_ == "ADP":  # Check if token is a preposition
        prep_phrase = " ".join([tok.text for tok in token.subtree])
        prepositional_phrases.append(prep_phrase)

print("Prepositional Phrases:", prepositional_phrases)

noun Phrases: ['The quick brown fox', 'the lazy dog']
----------------------------------------------------------------------------------------------------
Verb Phrases: ['jumps']
----------------------------------------------------------------------------------------------------
Prepositional Phrases: ['over the lazy dog']


# Sentiment Analyzer

In [None]:
from huggingface_hub import login

HF_USERNAME = ""
HF_TOKEN = ""

try:
  login(token=HF_TOKEN)
except ValueError:
  login(username=HF_USERNAME, token=HF_TOKEN)

In [None]:
from datasets import load_dataset
import pandas as pd
import numpy as np

ds= load_dataset("Falah/sentiments-dataset-381-classes")
df = pd.DataFrame(ds['train'])
ds

DatasetDict({
    train: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 1061
    })
})

In [None]:
from collections import Counter
#Get the unique class names from the "sentiment" column
label_names = df['sentiment'].unique()
label_names = label_names.tolist()
label_names[:5]
label_counter = Counter(df['sentiment'].tolist())

tmp_label_counter = {}
for label, count in label_counter.items():
    if count >= 10:
        tmp_label_counter[label] = count

label_counter = tmp_label_counter
labels_to_keep = list(label_counter.keys())
df = df[df['sentiment'].isin(labels_to_keep)]
df.shape,len(labels_to_keep)

((436, 2), 16)

In [None]:
from datasets import DatasetDict, Dataset, ClassLabel
ds = Dataset.from_pandas(df)
ds = ds.remove_columns('__index_level_0__')
ds

Dataset({
    features: ['text', 'sentiment'],
    num_rows: 436
})

In [None]:
from datasets import DatasetDict, Dataset, ClassLabel

# Convert sentiment to ClassLabel (int)

ds = ds.cast_column("sentiment", ClassLabel(names=labels_to_keep))
id2label = ds.features["sentiment"].int2str

# First split: 85% train + 15% test
train_test = ds.train_test_split(test_size=0.15, stratify_by_column="sentiment")

# Second split: From the remaining 85%, split into 10% validation and the rest as train (75%)
train_valid = train_test["train"].train_test_split(test_size=10/85, stratify_by_column="sentiment") # 10/85 to get 10% of the original data

final_dataset = DatasetDict({
    "train": train_valid["train"],
    "test": train_test["test"],
    "valid": train_valid["test"],
})

print(final_dataset)
print(id2label)

# Verification (optional - check the sizes and distribution)
print(len(final_dataset["train"]) / len(ds)) # Should be close to 0.75
print(len(final_dataset["valid"]) / len(ds)) # Should be close to 0.10
print(len(final_dataset["test"]) / len(ds))  # Should be close to 0.15



Casting the dataset:   0%|          | 0/436 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 326
    })
    test: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 66
    })
    valid: Dataset({
        features: ['text', 'sentiment'],
        num_rows: 44
    })
})
<bound method ClassLabel.int2str of ClassLabel(names=['Positive', 'Joyful', 'Disappointed', 'Worried', 'Grateful', 'Indifferent', 'Sad', 'Angry', 'Relieved', 'Excited', 'Anxious', 'Satisfied', 'Happy', 'Nostalgic', 'Inspired', 'Impressed'], id=None)>
0.7477064220183486
0.10091743119266056
0.15137614678899083


In [None]:
# # Check distribution
# def check_distribution(dataset, split_name):
#     sentiments = dataset[split_name]["sentiment"]
#     unique_sentiments = set(sentiments)
#     for sentiment in unique_sentiments:
#       count = sentiments.count(sentiment)
#       print(f"{split_name} - Sentiment {sentiment}: {count} ({count/len(sentiments)*100:.2f}%)")

# check_distribution(final_dataset, "train")
# check_distribution(final_dataset, "valid")
# check_distribution(final_dataset, "test")

In [None]:
# Load model directly
from transformers import AutoTokenizer, AutoModelForSequenceClassification,AutoConfig


model_name = "bhadresh-savani/distilbert-base-uncased-emotion"
base_tokenizer = AutoTokenizer.from_pretrained(model_name)
base_model = AutoModelForSequenceClassification.from_pretrained(model_name)
base_config = AutoConfig.from_pretrained(model_name)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F



class Emojify(nn.Module):
    def __init__(self, embed_dim, output_dim,input_sz=512,hidden_sz=256, num_heads=4):
        super(Emojify, self).__init__()
        # Use pre-trained DistilBERT embeddings - (vocab_size,embed_dim)
        self.embedding = base_model.distilbert.embeddings
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.lstm1 = nn.LSTM(embed_dim, hidden_sz, batch_first=True, bidirectional=True)
        self.layer_norm1 = nn.LayerNorm(input_sz)
        self.lstm2 = nn.LSTM(input_sz, hidden_sz, batch_first=True, bidirectional=True)
        self.layer_norm2 = nn.LayerNorm(input_sz)
        self.lstm3 = nn.LSTM(input_sz, hidden_sz, batch_first=True)
        self.layer_norm3 = nn.LayerNorm(hidden_sz)
        self.fc = nn.Linear(hidden_sz, output_dim)

    def forward(self, input_ids, attention_mask):
        with torch.no_grad():  # Use frozen embeddings from DistilBERT
            embeddings = self.embedding(input_ids)
            # embeddings - (batch_size,seq_length,embed_dim)

        attn_output, attn_weight = self.attention(embeddings, embeddings, embeddings)
        # attn_output - (batch_size,seq_length,embed_dim)
        # attn_weight - (batch_size,seq_length,seq_length)
        attn_output = embeddings + attn_output  # skip connection
        # attn_output - (batch_size, seq_length, embed_dim)
        output, _ = self.lstm1(attn_output)
        output = self.layer_norm1(output)
        # layer_norm1 - (batch_size, seq_length, input_sz)
        output, _ = self.lstm2(output)
        output = self.layer_norm2(output)
        # layer_norm2 - (batch_size, seq_length, input_sz)
        output, (hidden, _) = self.lstm3(output)
        # lstm3_output - (batch_size, seq_length, hidden_sz)
        # lstm3_hidden - (1, batch_size, hidden_sz)  (1 because it's the last layer)
        hidden_output = self.layer_norm3(hidden.squeeze(0))  # Squeeze the hidden state
        # layer_norm3 - (batch_size, hidden_sz)
        hidden_output = self.fc(hidden_output)
        # hidden_output - (batch_size, output_dim)
        return hidden_output


In [None]:
# # Dummy data for testing
# batch_size = 32
# sequence_length = 64
# vocab_size = 1000
# embed_dim = 768
# output_dim = 16
# hidden_sz = 256
# input_sz = hidden_sz * 2


# input_ids = torch.randint(0, vocab_size, (batch_size, sequence_length))
# attention_mask = torch.ones((batch_size, sequence_length))

# model = Emojify(embed_dim, output_dim,input_sz,hidden_sz)
# output = model(input_ids, attention_mask)
# print(f"Final output shape: {output.shape}")

In [None]:
from torch.utils.data import DataLoader, Dataset
from datasets import DatasetDict


class EmojifyDataset(Dataset):
    def __init__(self, dataset, base_tokenizer, max_length=512):
        self.dataset = dataset
        self.tokenizer = base_tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        text = self.dataset[idx]['text']
        label = self.dataset[idx]['sentiment']
        inputs = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )

        return {
            "input_ids": inputs["input_ids"].squeeze(0),   # batch_size
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "labels": torch.tensor(label, dtype=torch.long)
        }

max_input_length = base_config.max_position_embeddings  # 512
# base_tokenizer.pad_token = base_tokenizer.eos_token

# Convert datasets into PyTorch datasets
train_dataset = EmojifyDataset(final_dataset['train'], base_tokenizer,max_input_length)
valid_dataset = EmojifyDataset(final_dataset['valid'], base_tokenizer,max_input_length)
test_dataset = EmojifyDataset(final_dataset['test'], base_tokenizer,max_input_length)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [None]:
# Initialize Model
output_dim = len(labels_to_keep)  # 16
vocab_size = base_tokenizer.vocab_size  # 30522
embed_dim = base_config.hidden_size  # 768
hidden_sz = 256
input_sz = hidden_sz * 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
emojify_model = Emojify(embed_dim, output_dim,input_sz,hidden_sz, num_heads=4).to(device)
emojify_model

Emojify(
  (embedding): Embeddings(
    (word_embeddings): Embedding(30522, 768, padding_idx=0)
    (position_embeddings): Embedding(512, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (attention): MultiheadAttention(
    (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
  )
  (lstm1): LSTM(768, 256, batch_first=True, bidirectional=True)
  (layer_norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (lstm2): LSTM(512, 256, batch_first=True, bidirectional=True)
  (layer_norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (lstm3): LSTM(512, 256, batch_first=True)
  (layer_norm3): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
  (fc): Linear(in_features=256, out_features=16, bias=True)
)

In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(emojify_model.parameters(), lr=2e-5)

In [None]:
# Training Loop
num_epochs = 10
best_validation_loss = float('inf')

In [None]:
from tqdm import tqdm
import torch
import torch.nn as nn

scaler = torch.amp.GradScaler()

for epoch in range(num_epochs):
    emojify_model.train()
    epoch_training_loss = 0

    for batch in tqdm(train_loader):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        with torch.amp.autocast('cuda'):
            outputs = emojify_model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
        # loss.backward()
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(emojify_model.parameters(), max_norm=1.0)  # Prevent large updates that can destabilize training
        # optimizer.step()
        scaler.step(optimizer)
        scaler.update()
        epoch_training_loss += loss.item()
    avg_epoch_training_loss = epoch_training_loss / len(train_loader)

    # Validation Step
    emojify_model.eval()
    epoch_validation_loss = 0
    correct_pred_labels = 0
    total_labels = 0

    with torch.no_grad():
        for batch in valid_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = emojify_model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            epoch_validation_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            correct_pred_labels += (preds == labels).sum().item()
            total_labels += labels.size(0)
    avg_epoch_validation_loss = epoch_validation_loss / len(valid_loader)
    avg_validation_accuracy = correct_pred_labels / total_labels

    print(f"Epoch {epoch+1}: Train Loss = {avg_epoch_training_loss:.4f}, Val Loss = {avg_epoch_validation_loss:.4f}, Val Accuracy = {avg_validation_accuracy:.4f}")

    # Early Stopping
    if avg_epoch_validation_loss < best_validation_loss:
        best_validation_loss = avg_epoch_validation_loss
        torch.save(emojify_model.state_dict(), "best_emojify_model.pth")
        print("Model saved!")


100%|██████████| 21/21 [00:01<00:00, 11.07it/s]


Epoch 1: Train Loss = 2.4914, Val Loss = 2.4793, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.53it/s]


Epoch 2: Train Loss = 2.5087, Val Loss = 2.4777, Val Accuracy = 0.2500
Model saved!


100%|██████████| 21/21 [00:01<00:00, 11.60it/s]


Epoch 3: Train Loss = 2.5006, Val Loss = 2.4803, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.75it/s]


Epoch 4: Train Loss = 2.5059, Val Loss = 2.4851, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.75it/s]


Epoch 5: Train Loss = 2.4909, Val Loss = 2.4844, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.81it/s]


Epoch 6: Train Loss = 2.4705, Val Loss = 2.4806, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.90it/s]


Epoch 7: Train Loss = 2.4850, Val Loss = 2.4826, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.90it/s]


Epoch 8: Train Loss = 2.4954, Val Loss = 2.4822, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.76it/s]


Epoch 9: Train Loss = 2.5023, Val Loss = 2.4833, Val Accuracy = 0.2500


100%|██████████| 21/21 [00:01<00:00, 11.75it/s]


Epoch 10: Train Loss = 2.4766, Val Loss = 2.4857, Val Accuracy = 0.2500


In [None]:
# Test Step
emojify_model.load_state_dict(torch.load("best_emojify_model.pth",weights_only=True))
emojify_model.eval()

test_loss = 0
correct = 0
total = 0

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = emojify_model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        preds = torch.argmax(outputs, dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

test_accuracy = correct / total
print(f"Test Accuracy: {test_accuracy:.4f}")

  emojify_model.load_state_dict(torch.load("best_emojify_model.pth"))


Test Accuracy: 0.2576


In [None]:
def predict_sentiment(text, model, tokenizer, device):
    model.eval()  # Set model to evaluation mode

    # Tokenize and preprocess text
    inputs = tokenizer(
        text,
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )

    input_ids = inputs["input_ids"].to(device)
    attention_mask = inputs["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask)

    # Get predicted class
    predicted_label = torch.argmax(outputs, dim=1).item()
    # Convert numerical prediction to label
    predicted_sentiment = id2label(predicted_label)

    return predicted_sentiment  # Returns numerical sentiment class

# Load the trained model
emojify_model.load_state_dict(torch.load("best_emojify_model.pth", map_location=device))
emojify_model.to(device)

# Example text to test
new_text = "I am not sure about ! Everything is going great. 😊"

# Predict sentiment
predicted_sentiment = predict_sentiment(new_text, emojify_model, base_tokenizer, device)
print(f"Predicted Sentiment: {predicted_sentiment}")


Predicted Sentiment: Happy


  emojify_model.load_state_dict(torch.load("best_emojify_model.pth", map_location=device))


In [None]:
id2label

<bound method ClassLabel.int2str of ClassLabel(names=['Positive', 'Joyful', 'Disappointed', 'Worried', 'Grateful', 'Indifferent', 'Sad', 'Angry', 'Relieved', 'Excited', 'Anxious', 'Satisfied', 'Happy', 'Nostalgic', 'Inspired', 'Impressed'], id=None)>

# Token Classification

In [None]:
# # %%

# import itertools
# from tqdm import tqdm
# import numpy as np
# import torch
# from transformers import BertJapaneseTokenizer, BertForTokenClassification
# import pytorch_lightning as pl

# # from torch.utils.data import DataLoader
# # import from_XML_to_json as XtC
# # import random
# # import json
# # import unicodedata
# # import pandas as pd

# # %%
# # 8-16
# # PyTorch Lightning model
# class BertForTokenClassification_pl(pl.LightningModule):

#     def __init__(self, model_name, num_labels, lr):
#         super().__init__()
#         self.save_hyperparameters()
#         self.bert_tc = BertForTokenClassification.from_pretrained(
#             model_name,
#             num_labels=num_labels
#         )

#     def training_step(self, batch, batch_idx):
#         output = self.bert_tc(**batch)
#         loss = output.loss
#         self.log('train_loss', loss)
#         return loss

#     def validation_step(self, batch, batch_idx):
#         output = self.bert_tc(**batch)
#         val_loss = output.loss
#         self.log('val_loss', val_loss)

#     def configure_optimizers(self):
#         return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)



# # %%
# class NER_tokenizer_BIO(BertJapaneseTokenizer):

#     # The number of categories of named entities `num_entity_type` at initialization
#     # make it accept.
#     def __init__(self, *args, **kwargs):
#         self.num_entity_type = kwargs.pop('num_entity_type')
#         super().__init__(*args, **kwargs)

#     def encode_plus_tagged(self, text, entities, max_length):
#         """
#         Given a sentence and named entities,
#         Encode and create a label string.
#         """
#         # Divide the text before and after the named entity and label each.
#         splitted = [] # Add the string after division
#         position = 0

#         for entity in entities:
#             start = entity['span'][0]
#             end = entity['span'][1]
#             label = entity['type_id']
#             splitted.append({'text':text[position:start], 'label':0})
#             splitted.append({'text':text[start:end], 'label':label})
#             position = end
#         splitted.append({'text': text[position:], 'label':0})
#         splitted = [ s for s in splitted if s['text'] ]

#         # Tokenize and label each segmented sentence
#         tokens = []
#         labels = []
#         for s in splitted:
#             tokens_splitted = self.tokenize(s['text'])
#             label = s['label']
#             if label > 0: # 固有表現
#                 # First, assign I-tags to all tokens
#                 # Number order O-tag: 0, B-tag: 1 ~ number of tags, I-tag: number of tags ~
#                 labels_splitted =  \
#                     [ label + self.num_entity_type ] * len(tokens_splitted)
#                 # Make the first token a B-tag
#                 labels_splitted[0] = label
#             else:
#                 labels_splitted =  [0] * len(tokens_splitted)

#             tokens.extend(tokens_splitted)
#             labels.extend(labels_splitted)

#         # Encode it and put it into a format that can be input to BERT.
#         input_ids = self.convert_tokens_to_ids(tokens)
#         encoding = self.prepare_for_model(
#             input_ids,
#             max_length=max_length,
#             padding='max_length',
#             truncation=True
#         )

#         # Add Special Tokens to Labels
#         # Cut by max_length and put labels before and after to add [CLS] and [SEP]
#         labels = [0] + labels[:max_length-2] + [0]
#         # If it is less than max_length, add the missing part to the end
#         labels = labels + [0]*( max_length - len(labels) )
#         encoding['labels'] = labels

#         return encoding

#     def encode_plus_untagged(
#         self, text, max_length=None, return_tensors=None
#     ):
#         """
#         Tokenize the sentences and identify the position of each token in the sentence.
#         Same as encode_plus_untagged in IO method tokenizer
#         """
#         # Tokenize the text and associate each token with the character string in the text.
#         tokens = [] # Add tokens.
#         tokens_original = [] # Add the character strings in the sentence corresponding to the token.
#         words = self.word_tokenizer.tokenize(text) # Split into words with MeCab
#         for word in words:
#             # Split word into subwords
#             tokens_word = self.subword_tokenizer.tokenize(word)
#             tokens.extend(tokens_word)
#             if tokens_word[0] == '[UNK]': # Dealing with unknown words
#                 tokens_original.append(word)
#             else:
#                 tokens_original.extend([
#                     token.replace('##','') for token in tokens_word
#                 ])

#         # Find the position of each token in the sentence. (considering blank positions)
#         position = 0
#         spans = [] # Add token positions.
#         for token in tokens_original:
#             l = len(token)
#             while 1:
#                 if token != text[position:position+l]:
#                     position += 1
#                 else:
#                     spans.append([position, position+l])
#                     position += l
#                     break

#         # Encode it and put it into a format that can be input to BERT.
#         input_ids = self.convert_tokens_to_ids(tokens)
#         encoding = self.prepare_for_model(
#             input_ids,
#             max_length=max_length,
#             padding='max_length' if max_length else False,
#             truncation=True if max_length else False
#         )
#         sequence_length = len(encoding['input_ids'])
#         # Added dummy span for special token [CLS].
#         spans = [[-1, -1]] + spans[:sequence_length-2]
#         # Added dummy spans for special tokens [SEP], [PAD].
#         spans = spans + [[-1, -1]] * ( sequence_length - len(spans) )

#         # Make it a torch.Tensor if necessary.
#         if return_tensors == 'pt':
#             encoding = { k: torch.tensor([v]) for k, v in encoding.items() }

#         return encoding, spans

#     @staticmethod
#     def Viterbi(scores_bert, num_entity_type, penalty=10000):
#         """
#         Find the optimal solution with the Viterbi algorithm.
#         """
#         m = 2*num_entity_type + 1
#         penalty_matrix = np.zeros([m, m])
#         for i in range(m):
#             for j in range(1+num_entity_type, m):
#                 if not ( (i == j) or (i+num_entity_type == j) ):
#                     penalty_matrix[i,j] = penalty
#         path = [ [i] for i in range(m) ]
#         scores_path = scores_bert[0] - penalty_matrix[0,:]
#         scores_bert = scores_bert[1:]



#         for scores in scores_bert:
#             assert len(scores) == 2*num_entity_type + 1
#             score_matrix = np.array(scores_path).reshape(-1,1) \
#                 + np.array(scores).reshape(1,-1) \
#                 - penalty_matrix
#             scores_path = score_matrix.max(axis=0)
#             argmax = score_matrix.argmax(axis=0)
#             path_new = []
#             for i, idx in enumerate(argmax):
#                 path_new.append( path[idx] + [i] )
#             path = path_new

#         labels_optimal = path[np.argmax(scores_path)]
#         return labels_optimal

#     def convert_bert_output_to_entities(self, text, scores, spans):
#         """
#         Obtain named entities from sentences, classification scores, and the position of each token.
#         Classification scores are two-dimensional arrays of size (series length, number of labels)
#         """
#         assert len(spans) == len(scores)
#         num_entity_type = self.num_entity_type

#         # Remove parts corresponding to special tokens
#         scores = [score for score, span in zip(scores, spans) if span[0]!=-1]
#         spans = [span for span in spans if span[0]!=-1]

#         # Determine the predicted value of the label with the Viterbi algorithm.
#         labels = self.Viterbi(scores, num_entity_type)

#         # Tokens with the same label are grouped together to extract named entities.
#         entities = []
#         for label, group in itertools.groupby(enumerate(labels), key=lambda x: x[1]):
#             group = list(group)
#             start = spans[group[0][0]][0]
#             end = spans[group[-1][0]][1]

#             if label != 0: # if it is a named entity
#                 if 1 <= label <= num_entity_type:
#                      # Add new entity if label is `B-`
#                     entity = {
#                         "name": text[start:end],
#                         "span": [start, end],
#                         "type_id": label
#                     }
#                     entities.append(entity)
#                 else:
#                     # If the label is `I-`, update the last entity
#                     entity['span'][1] = end
#                     entity['name'] = text[entity['span'][0]:entity['span'][1]]

#         return entities
