# Import Dependencies

In [None]:
import re
import joblib
import pandas as pd
import torch
import numpy as np
from pathlib import Path
from sklearn.svm import SVC
from sklearn.metrics import classification_report, accuracy_score
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from torch.utils.data import Dataset, DataLoader
from nltk.corpus import stopwords
import nltk
from torch.cuda.amp import autocast, GradScaler
import warnings

nltk.download('stopwords')
warnings.filterwarnings("ignore")

# Text Preprocessing Module

In [None]:
class TextPreprocessor:
    def __init__(self, stopwords):
        self.stopwords = stopwords
        self.punctuation = re.compile(r'[^\w\s]')

    def clean_text(self, text, for_bert=False):
        text = text.lower()
        text = re.sub(r'http\S+', '', text)  # Remove URLs
        text = re.sub(r'\d+', '', text)      # Remove numbers
        text = re.sub(r'\s+', ' ', text).strip()

        if not for_bert:
            text = self.punctuation.sub('', text)
            text = ' '.join([w for w in text.split() if w not in self.stopwords])
        return text

    def save(self, path):
        joblib.dump(self, path)

    @classmethod
    def load(cls, path):
        return joblib.load(path)

# SVM Module

In [None]:
class ABSASVM:
    def __init__(self, aspects):
        self.aspects = aspects
        self.models = {}
        self.vectorizer = TfidfVectorizer(
            max_features=10000,
            ngram_range=(1, 2),
            sublinear_tf=True
        )

    def train(self, X_train, y_train_dict, X_val, y_val_dict):
        X_train_vec = self.vectorizer.fit_transform(X_train)
        X_val_vec = self.vectorizer.transform(X_val)

        for aspect in self.aspects:
            print(f"\nTraining SVM for {aspect}".ljust(50, '-'))
            svm = SVC(
                kernel='rbf',
                class_weight='balanced',
                C=1.0,
                gamma='scale',
                random_state=42
            )
            svm.fit(X_train_vec, y_train_dict[aspect])

            val_preds = svm.predict(X_val_vec)
            val_acc = accuracy_score(y_val_dict[aspect], val_preds)
            print(f"Validation Accuracy: {val_acc:.4f}")
            self.models[aspect] = svm

    def save(self, model_dir):
        model_dir = Path(model_dir)
        model_dir.mkdir(parents=True, exist_ok=True)
        joblib.dump(self.vectorizer, model_dir/"vectorizer.joblib")
        for aspect in self.aspects:
            joblib.dump(self.models[aspect], model_dir/f"{aspect}.joblib")

    @classmethod
    def load(cls, model_dir, aspects):
        model_dir = Path(model_dir)
        instance = cls(aspects)
        instance.vectorizer = joblib.load(model_dir/"vectorizer.joblib")
        instance.models = {a: joblib.load(model_dir/f"{a}.joblib") for a in aspects}
        return instance

# BERT Module

In [None]:
class ABSABERT:
    def __init__(self, aspects, model_name='indolem/indobert-base-uncased'):
        self.aspects = aspects
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.models = {
            a: AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=3)
            for a in aspects
        }
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def train(self, train_loader_dict, val_loader_dict, epochs=10, lr=2e-5):
        scaler = GradScaler()  # For mixed precision
        for aspect in self.aspects:
            print(f"\nTraining BERT for {aspect}".ljust(50, '-'))
            model = self.models[aspect].to(self.device)
            optimizer = AdamW(model.parameters(), lr=lr)
            scheduler = get_linear_schedule_with_warmup(
                optimizer,
                num_warmup_steps=0,
                num_training_steps=len(train_loader_dict[aspect]) * epochs
            )

            best_loss = float('inf')
            for epoch in range(epochs):
                model.train()
                total_loss = 0
                for batch in train_loader_dict[aspect]:
                    batch = {k: v.to(self.device) for k, v in batch.items()}
                    optimizer.zero_grad()

                    with autocast():  # Mixed precision
                        outputs = model(**batch)
                        loss = outputs.loss

                    scaler.scale(loss).backward()
                    scaler.step(optimizer)
                    scaler.update()
                    scheduler.step()

                    total_loss += loss.item()

                avg_loss = total_loss / len(train_loader_dict[aspect])
                # Validation step
                model.eval()
                val_loss = 0
                with torch.no_grad():
                    for batch in val_loader_dict[aspect]:
                        batch = {k: v.to(self.device) for k, v in batch.items()}
                        with autocast():
                            outputs = model(**batch)
                        val_loss += outputs.loss.item()
                avg_val_loss = val_loss / len(val_loader_dict[aspect])
                print(f"Epoch {epoch+1}: Train Loss: {avg_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

                if avg_val_loss < best_loss:
                    best_loss = avg_val_loss
                    torch.save(model.state_dict(), f"best_{aspect}.pt")
            model.load_state_dict(torch.load(f"best_{aspect}.pt"))


    def save(self, model_dir):
        model_dir = Path(model_dir)
        model_dir.mkdir(parents=True, exist_ok=True)
        self.tokenizer.save_pretrained(model_dir)
        for aspect in self.aspects:
            aspect_dir = model_dir/f"{aspect}"
            aspect_dir.mkdir(exist_ok=True)
            self.models[aspect].save_pretrained(aspect_dir)

    @classmethod
    def load(cls, model_dir, aspects):
        model_dir = Path(model_dir)
        instance = cls(aspects, model_name=model_dir)
        instance.models = {
            a: AutoModelForSequenceClassification.from_pretrained(model_dir/a)
            for a in aspects
        }
        instance.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        return instance

# Dataset class for BERT
class ABSADataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts.iloc[idx])
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(self.labels.iloc[idx], dtype=torch.long)
        }

# Inference Module

In [None]:
class ABSAInferencer:
    def __init__(self, model_dir, aspects, preprocessor, model_type='bert'):
        self.aspects = aspects
        self.preprocessor = preprocessor
        self.model_type = model_type
        self.label_map = {0: 'neg', 1: 'neut', 2: 'pos'}

        if model_type == 'bert':
            self.model = ABSABERT.load(Path(model_dir), aspects)
            self.tokenizer = AutoTokenizer.from_pretrained(Path(model_dir))
        else:
            self.model = ABSASVM.load(Path(model_dir), aspects)

    def predict(self, text):
        if self.model_type == 'bert':
            return self._predict_bert(text)
        return self._predict_svm(text)

    def _predict_bert(self, text):
        clean_text = self.preprocessor.clean_text(text, for_bert=True)
        encoding = self.tokenizer.encode_plus(
            clean_text,
            add_special_tokens=True,
            max_length=128,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        ).to(self.model.device)

        preds = {}
        for aspect in self.aspects:
            with torch.no_grad():
                outputs = self.model.models[aspect](**encoding)
            preds[aspect] = self.label_map[torch.argmax(outputs.logits).item()]
        return preds

    def _predict_svm(self, text):
        clean_text = self.preprocessor.clean_text(text, for_bert=False)
        X = self.model.vectorizer.transform([clean_text])
        return {
            aspect: self.label_map[self.model.models[aspect].predict(X)[0]]
            for aspect in self.aspects
        }

# Training

In [None]:
def load_and_convert_data(file_path, aspect_cols):
    """Load CSV and convert sentiment labels to integers"""
    df = pd.read_csv(file_path)
    label_map = {'neg': 0, 'neut': 1, 'pos': 2}

    for col in aspect_cols:
        df[col] = df[col].map(label_map).fillna(1).astype('int8')  # neut as default

    return df


# Training
if __name__ == "__main__":

    aspects = ['ac', 'air_panas', 'bau', 'general', 'kebersihan',
             'linen', 'service', 'sunrise_meal', 'tv', 'wifi']

    df_train = load_and_convert_data('train_preprocess.csv', aspects)
    df_val = load_and_convert_data('valid_preprocess.csv', aspects)

    # Initialize components
    stopwords_id = stopwords.words('indonesian')
    preprocessor = TextPreprocessor(stopwords_id)

    # Preprocess text
    for df in [df_train, df_val]:
        df['clean_svm'] = df['review'].apply(preprocessor.clean_text, for_bert=False)
        df['clean_bert'] = df['review'].apply(preprocessor.clean_text, for_bert=True)

    # Train SVM
    svm_model = ABSASVM(aspects)
    svm_model.train(
        df_train['clean_svm'],
        {a: df_train[a] for a in aspects},
        df_val['clean_svm'],
        {a: df_val[a] for a in aspects}
    )

    # Train BERT
    bert_model = ABSABERT(aspects)
    train_loaders = {
        a: DataLoader(
            ABSADataset(df_train['clean_bert'], df_train[a], bert_model.tokenizer),
            batch_size=16,
            shuffle=True
        ) for a in aspects
    }
    val_loaders = {
        a: DataLoader(
            ABSADataset(df_val['clean_bert'], df_val[a], bert_model.tokenizer),
            batch_size=16
        ) for a in aspects
    }
    bert_model.train(train_loaders, val_loaders)

    # Save models
    model_dir = Path("saved_models")
    svm_model.save(model_dir/"svm")
    bert_model.save(model_dir/"bert")
    preprocessor.save(model_dir/"preprocessor.joblib")

Test Inference

In [None]:
test_text = "lumayan nyaman,tp kebersihan kmr mandi perlu ditingkatkan lg biar gk ada kuning2 di sudutnya lbh bgs"

preprocessor = TextPreprocessor.load(model_dir/"preprocessor.joblib")
svm_infer = ABSAInferencer(model_dir/"svm", aspects, preprocessor, 'svm')
bert_infer = ABSAInferencer(model_dir/"bert", aspects, preprocessor, 'bert')
print("\nSVM Predictions:", svm_infer.predict(test_text))
print("\nBERT Predictions:", bert_infer.predict(test_text))

# Test Data Inference

In [None]:
df_test = load_and_convert_data('test_preprocess.csv', aspects)
aspects = ['ac', 'air_panas', 'bau', 'general', 'kebersihan',
           'linen', 'service', 'sunrise_meal', 'tv', 'wifi']

# Load inferencers
preprocessor = TextPreprocessor.load(model_dir/"preprocessor.joblib")
svm_infer = ABSAInferencer(model_dir/"svm", aspects, preprocessor, 'svm')
bert_infer = ABSAInferencer(model_dir/"bert", aspects, preprocessor, 'bert')