<a href="https://colab.research.google.com/github/Natural-Language-Processing-SS24/task2/blob/main/Rezepte_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sentiment-Analyse

In [None]:
!pip install transformers[torch]
!pip install pyspellchecker
!pip install transformers gradio
!pip install python-docx

## Umgebungseinstellung

In [None]:
# Google Colab spezifische Importe
from google.colab import files
from google.colab import drive

# Datenverarbeitung und Modelltraining
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

# Transformer Modelle und Tokenizer
from transformers import (
    BertTokenizer, BertForSequenceClassification,
    RobertaTokenizer, RobertaForSequenceClassification,
    DistilBertTokenizer, DistilBertForSequenceClassification,
    GPT2Tokenizer, GPT2ForSequenceClassification,
    BartTokenizer, BartForSequenceClassification,
    T5Tokenizer, T5ForConditionalGeneration, AdamW,
    Trainer, TrainingArguments
)

# PyTorch Bibliotheken
import torch
from torch.utils.data import DataLoader, Dataset

# Weitere Bibliotheken
from tqdm import tqdm
import gradio as gr
import docx
from collections import Counter
import re
from multiprocessing import Pool
import matplotlib.pyplot as plt

## Daten hochladen und laden

In [None]:
# Funktion zum Hochladen von Dateien in Google Colab
def upload_files():
    uploaded = files.upload()
    return uploaded

# CSV-Datei laden
uploaded = upload_files()
train_data = pd.read_csv('Sentiment_Training.csv', delimiter=';')
test_data = pd.read_csv('Sentiment_Test.csv', delimiter=';')

## Explorative Datenanalyse

In [None]:
# Anzeigen der ersten Zeilen und Informationen
print("Erste Zeilen des Trainingsdatensatzes:")
print(train_data.head())

print("\nInformationen zum Trainingsdatensatz:")
print(train_data.info())

# Textlänge berechnen
train_data['text_length'] = train_data['text'].apply(len)
print("\nStatistik der Textlängen im Trainingsdatensatz:")
print(train_data['text_length'].describe())

# Histogramm der Textlängen
plt.figure(figsize=(10, 6))
plt.hist(train_data['text_length'], bins=50, edgecolor='black')
plt.title('Verteilung der Textlängen')
plt.xlabel('Textlänge')
plt.ylabel('Häufigkeit')
plt.show()

# Verteilung der Labels
print("\nVerteilung der Labels im Trainingsdatensatz:")
print(train_data['label'].value_counts().sort_index())

## Slang-Wörterbuch laden

In [None]:
# Funktion zum Lesen der Word-Datei und Erstellen eines Slang-Wörterbuchs
def read_slang_dict_from_docx(docx_file):
    doc = docx.Document(docx_file)
    slang_dict = {}
    for para in doc.paragraphs:
        if ':' in para.text:
            key, value = para.text.split(':', 1)
            slang_dict[key.strip().lower()] = value.strip().lower()
    return slang_dict

# Word-Datei hochladen und lesen
uploaded = upload_files()
docx_file = 'abbreviations.docx'
slang_dict = read_slang_dict_from_docx(docx_file)
print("Slang Dictionary:", slang_dict)

In [None]:
def find_slang_words(df, slang_dict):
    slang_words_found = []

    for text in df['text']:
        words = text.split()
        for word in words:
            if word.lower() in slang_dict:
                slang_words_found.append(word.lower())

    return Counter(slang_words_found)

# Finden von Slang-Wörtern im Trainingsdatensatz
slang_words_counter = find_slang_words(train_data, slang_dict)

# Anzeige der Slang-Wörter und ihrer Häufigkeit
print(slang_words_counter)

## Textvorverarbeitung

In [None]:
# Preprocessing-Funktion
def preprocess_text(text, slang_dict):
    text = re.sub(r'\s+', ' ', text).strip()
    words = text.split()
    new_words = [slang_dict.get(word.lower(), word) for word in words]
    text = ' '.join(new_words)
    return text

def preprocess_text_parallel(text):
    return preprocess_text(text, slang_dict)

## Datenvorbereitung

In [None]:
# Funktion zur Kategorisierung der Sternebewertungen
def categorize_rating(rating):
    if rating <= 1:
        return 'negative'
    elif rating == 2:
        return 'neutral'
    else:
        return 'positive'

# Anwenden der Funktion auf die Trainings- und Testdaten
train_data['sentiment'] = train_data['label'].apply(categorize_rating)
test_data['sentiment'] = test_data['label'].apply(categorize_rating)

In [None]:
# Trainingsdaten in Trainings- und Validierungsdatensätze aufteilen
train_data, val_data = train_test_split(train_data, test_size=0.2, stratify=train_data['sentiment'], random_state=42)

## Dataset und Tokenisierung

In [None]:
# Dataset-Klasse definieren
class YelpDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.inputs['input_ids'])

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.inputs.items()}
        item['labels'] = self.labels[idx]
        return item

# Funktion zum Tokenisieren der Daten
def tokenize_data(data, tokenizer, slang_dict, model_name=None, max_length=128):
    with Pool() as pool:
        texts = pool.map(preprocess_text_parallel, data['text'].tolist())
    labels = pd.Categorical(data['sentiment']).codes
    
    if model_name == 'T5':
        inputs = tokenizer(texts, padding=True, truncation=True, max_length=max_length, return_tensors="pt")
        targets = ["positive" if label == 2 else "neutral" if label == 1 else "negative" for label in labels]
        target_inputs = tokenizer(targets, padding=True, truncation=True, max_length=max_length, return_tensors="pt")
        return inputs, target_inputs.input_ids
    else:
        inputs = tokenizer(texts, padding=True, truncation=True, max_length=max_length, return_tensors="pt")
        return inputs, labels

## Modelle definieren

In [None]:
# Tokenizer und Modelle definieren
model_configs = {
    'BERT': {
        'tokenizer': BertTokenizer.from_pretrained('bert-base-uncased'),
        'model': BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)
    },
    'RoBERTa': {
        'tokenizer': RobertaTokenizer.from_pretrained('roberta-base'),
        'model': RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=3)
    },
    'DistilBERT': {
        'tokenizer': DistilBertTokenizer.from_pretrained('distilbert-base-uncased'),
        'model': DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=3)
    },
    'GPT-2': {
        'tokenizer': GPT2Tokenizer.from_pretrained('gpt2'),
        'model': GPT2ForSequenceClassification.from_pretrained('gpt2', num_labels=3)
    },
    'BART': {
        'tokenizer': BartTokenizer.from_pretrained('facebook/bart-base'),
        'model': BartForSequenceClassification.from_pretrained('facebook/bart-base', num_labels=3)
    },
    'T5': {
        'tokenizer': T5Tokenizer.from_pretrained('t5-base'),
        'model': T5ForConditionalGeneration.from_pretrained('t5-base')
    }
}


## Training und Evaluierung

In [None]:
# Ensure all tokenizers have a pad_token and adjust models
for model_name, config in model_configs.items():
    tokenizer = config['tokenizer']
    model = config['model']
    
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': tokenizer.eos_token})
        model.resize_token_embeddings(len(tokenizer))
    
    # Explicitly set the pad_token_id for models that require it
    model.config.pad_token_id = tokenizer.pad_token_id
    assert tokenizer.pad_token is not None, f"Pad token not added for {model_name}"

# Optimizer definieren
def get_optimizer(model):
    return AdamW(model.parameters(), lr=5e-5)

# Trainingsschleife mit Gewichtung
def train(model, train_loader, optimizer, device, class_weights, model_name=None):
    model.train()
    total_loss = 0
    class_weights = class_weights.to(device)
    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        if model_name == 'T5':
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        else:
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            weighted_loss = loss * class_weights[labels]
            weighted_loss = weighted_loss.mean()
            weighted_loss.backward()
            optimizer.step()
            total_loss += weighted_loss.item()
    return total_loss / len(train_loader)

# Evaluierungsschleife
def evaluate(model, val_loader, device, tokenizer, model_name=None):
    model.eval()
    total_loss = 0
    correct_predictions = 0
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for batch in tqdm(val_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            if model_name == 'T5':
                outputs = model.generate(input_ids=input_ids, attention_mask=attention_mask, max_length=128)
                preds = tokenizer.batch_decode(outputs, skip_special_tokens=True)
                true_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
                loss = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels).loss
                total_loss += loss.item()
                correct_predictions += sum([1 if pred.strip() == true_label.strip() else 0 for pred, true_label in zip(preds, true_labels)])
            else:
                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                total_loss += loss.item()
                preds = torch.argmax(outputs.logits, dim=1)
                correct_predictions += torch.sum(preds == labels).item()
                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(preds.cpu().numpy())
    accuracy = correct_predictions / len(val_loader.dataset)
    return total_loss / len(val_loader), accuracy, all_labels, all_preds

## Modelle trainieren und speichern

In [None]:
# Sicherstellen, dass alle Tokenizer einen pad_token haben und Modelle entsprechend anpassen
for model_name, config in model_configs.items():
    tokenizer = config['tokenizer']
    model = config['model']
    
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': tokenizer.eos_token})
        model.resize_token_embeddings(len(tokenizer))
    
    assert tokenizer.pad_token is not None, f"Pad token not added for {model_name}"

# Klassen-Gewichtungen
class_weights = torch.tensor([1.0, 2.0, 1.0])

trained_models = {}
for model_name, config in model_configs.items():
    tokenizer = config['tokenizer']
    model = config['model']

    # Tokenisieren der Trainings-, Validierungs- und Testdaten
    train_inputs, train_labels = tokenize_data(train_data, tokenizer, slang_dict, model_name)
    val_inputs, val_labels = tokenize_data(val_data, tokenizer, slang_dict, model_name)
    test_inputs, test_labels = tokenize_data(test_data, tokenizer, slang_dict, model_name)

    # Daten in Dataset-Objekte umwandeln
    train_dataset = YelpDataset(train_inputs, train_labels)
    val_dataset = YelpDataset(val_inputs, val_labels)
    test_dataset = YelpDataset(test_inputs, test_labels)

    # DataLoader erstellen
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16)
    test_loader = DataLoader(test_dataset, batch_size=16)

    # Modell auf GPU/CPU laden
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Optimizer definieren
    optimizer = get_optimizer(model)

    # Training und Evaluierung
    epochs = 3
    for epoch in range(epochs):
        print(f"Training {model_name} - Epoch {epoch + 1}/{epochs}")
        train_loss = train(model, train_loader, optimizer, device, class_weights, model_name)
        val_loss, val_accuracy, val_labels, val_preds = evaluate(model, val_loader, device, tokenizer, model_name)
        print(f"Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

    # Berechnung der Evaluationsmetriken
    conf_matrix = confusion_matrix(val_labels, val_preds)
    class_report = classification_report(val_labels, val_preds, target_names=['negative', 'neutral', 'positive'])

    print(f"\nConfusion Matrix for {model_name}:")
    print(conf_matrix)

    print(f"\nClassification Report for {model_name}:")
    print(class_report)

    # Confusion Matrix plotten
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title(f'Confusion Matrix for {model_name}')
    plt.show()

    # Modell speichern
    trained_models[model_name] = {
        'model': model,
        'tokenizer': tokenizer,
        'val_loss': val_loss,
        'val_accuracy': val_accuracy,
        'conf_matrix': conf_matrix,
        'class_report': class_report
    }

## Gradio-Oberfläche erstellen

In [None]:
# Gradio-Oberfläche erstellen
def classify_text(text):
    results = {}
    for model_name, model_info in trained_models.items():
        model = model_info['model']
        tokenizer = model_info['tokenizer']

        inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=128).to(device)
        if model_name == 'T5':
            outputs = model.generate(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], max_length=128)
            prediction = tokenizer.decode(outputs[0], skip_special_tokens=True)
        else:
            outputs = model(**inputs)
            prediction = torch.argmax(outputs.logits, dim=1).item()
            sentiments = {0: 'negative', 1: 'neutral', 2: 'positive'}
            prediction = sentiments[prediction]
        
        results[model_name] = prediction

    return results

# Satz auswählen und Ergebnisse anzeigen
def get_test_sentence(index):
    return test_data.iloc[index]['text'], test_data.iloc[index]['sentiment']

def get_results_for_test_sentence(index):
    text, original_label = get_test_sentence(index)
    results = classify_text(text)
    results['Original Label'] = original_label
    return results

# Gradio-Komponente für den Satz-Picker
sentence_picker = gr.Dropdown(
    choices=[f"{i}: {text}" for i, text in enumerate(test_data['text'])],
    label="Wähle einen Satz aus dem Testdatensatz",
    interactive=True
)

# Gradio-Oberfläche erstellen
interface = gr.Interface(
    fn=get_results_for_test_sentence,
    inputs=sentence_picker,
    outputs="json",
    title="Ergebnisse für einen Satz aus dem Testdatensatz"
)

# Starten der Gradio-Oberfläche
interface.launch()