# Configurando o ambiente

In [18]:
! pip install transformers
! pip install torch
! pip install pandas
! pip install numpy
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from transformers import BertTokenizer, BertModel
from sklearn.metrics import classification_report

dataset_path = './macmorpho-train.txt'
test_input_path ='./macmorpho-test.txt'



Defaulting to user installation because normal site-packages is not writeable
[33mDEPRECATION: distro-info 1.1build1 has a non-standard version number. pip 24.1 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of distro-info or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Defaulting to user installation because normal site-packages is not writeable
[33mDEPRECATION: distro-info 1.1build1 has a non-standard version number. pip 24.1 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of distro-info or contact the author to suggest that

## Carregar dados

In [19]:
def decode_file_in_corpus(dataset_path):
    sentences = []
    tags = []
    tags_counter = {}
    with open(dataset_path, 'r', encoding='utf-8') as input_file:
        for line in input_file:
            if line == '\n':
                continue
            words = line.split()
            for word in words:
                word, tag = word.split('_')
                sentences.append(word)
                tags.append(tag)
                if tag in tags_counter:
                    tags_counter[tag] += 1
                else:
                    tags_counter[tag] = 1
    return sentences, tags, tags_counter

train_sentences, train_tags, tags_counter = decode_file_in_corpus(dataset_path)
test_sentences, test_tags, test_tags_counter = decode_file_in_corpus(test_input_path)


tags = list(tags_counter.keys())
counts = list(test_tags_counter.values())
tokenizer = BertTokenizer.from_pretrained('neuralmind/bert-base-portuguese-cased', torch_dtype=torch.long)


## Tagger

In [16]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertModel, BertTokenizer
import numpy as np
from sklearn.metrics import classification_report

class POSDataset(Dataset):
    def __init__(self, words, tags, tokenizer, tag2idx, max_len=128):
        self.words = words
        self.tags = tags
        self.tokenizer = tokenizer
        self.tag2idx = tag2idx
        self.max_len = max_len
        
        # Group words into sentences (assuming each word is a separate entry)
        self.sentences = []
        self.sentence_tags = []
        current_sentence = []
        current_tags = []
        
        for word, tag in zip(words, tags):
            current_sentence.append(word)
            current_tags.append(tag)
            # Create a new sentence after every 20 words or at end of list
            if len(current_sentence) >= 20 or word == words[-1]:
                self.sentences.append(' '.join(current_sentence))
                self.sentence_tags.append(current_tags)
                current_sentence = []
                current_tags = []

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        tags = self.sentence_tags[idx]

        encoding = self.tokenizer(
            sentence,
            return_tensors='pt',
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            add_special_tokens=True
        )

        # Convert tags to indices and pad
        tag_ids = [self.tag2idx[tag] for tag in tags]
        padded_tags = torch.full((self.max_len,), self.tag2idx['<pad>'])
        padded_tags[1:len(tag_ids)+1] = torch.tensor(tag_ids)  # Account for [CLS] token

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': padded_tags
        }

class BERTPOSTagger(nn.Module):
    def __init__(self, bert_model, num_tags):
        super(BERTPOSTagger, self).__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_tags)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            return_dict=True
        )
        sequence_output = outputs.last_hidden_state
        sequence_output = self.dropout(sequence_output)
        logits = self.classifier(sequence_output)
        return logits

def train_model(model, train_loader, val_loader, device, tag2idx, num_epochs=3):
    optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
    criterion = nn.CrossEntropyLoss(ignore_index=tag2idx['<pad>'])
    
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        total_loss = 0
        
        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            
            outputs = outputs.view(-1, outputs.shape[-1])
            labels = labels.view(-1)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        # Validation phase
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                
                outputs = model(input_ids, attention_mask)
                outputs = outputs.view(-1, outputs.shape[-1])
                labels = labels.view(-1)
                
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        
        avg_train_loss = total_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)
        print(f'Epoch {epoch + 1}:')
        print(f'Training Loss: {avg_train_loss:.4f}')
        print(f'Validation Loss: {avg_val_loss:.4f}')
        
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_pos_model.pt')

def evaluate_model(model, test_loader, device, idx2tag):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(input_ids, attention_mask)
            preds = torch.argmax(outputs, dim=-1)
            
            # Remove padding and special tokens
            for i in range(len(preds)):
                mask = attention_mask[i].bool()
                pred = preds[i][mask][1:-1]  # Remove [CLS] and [SEP]
                label = labels[i][mask][1:-1]
                
                all_preds.extend(pred.cpu().numpy())
                all_labels.extend(label.cpu().numpy())
    
    # Convert indices back to tags
    pred_tags = [idx2tag[idx] for idx in all_preds]
    true_tags = [idx2tag[idx] for idx in all_labels]
    
    return classification_report(true_tags, pred_tags)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Add padding tag to tags list
tags.append('<pad>')

# Create tag vocabularies
tag2idx = {tag: idx for idx, tag in enumerate(tags)}
idx2tag = {idx: tag for tag, idx in tag2idx.items()}

# Create datasets
train_dataset = POSDataset(train_sentences, train_tags, tokenizer, tag2idx)
test_dataset = POSDataset(test_sentences, test_tags, tokenizer, tag2idx)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

# Initialize model
bert_model = BertModel.from_pretrained('neuralmind/bert-base-portuguese-cased')
model = BERTPOSTagger(bert_model, len(tag2idx))
model.to(device)

# Train the model
train_model(model, train_loader, test_loader, device, tag2idx)

# Evaluate the model
results = evaluate_model(model, test_loader, device, idx2tag)
print("\nEvaluation Results:")
print(results)
