In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 -q
!pip install transformers datasets accelerate peft evaluate scikit-learn sentencepiece -q

In [None]:
import torch
import pandas as pd
import numpy as np
import evaluate
from sklearn.model_selection import train_test_split
from datasets import Dataset
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    TrainingArguments, Trainer, EarlyStoppingCallback, TrainerCallback
)
from peft import LoraConfig, get_peft_model
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import OneHotEncoder

In [None]:
class BalancedFocalLoss(torch.nn.Module):
    def __init__(self, alpha=None, gamma=2, smoothing=0.0):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.smoothing = smoothing

    def forward(self, inputs, targets):
        ce_loss = torch.nn.CrossEntropyLoss(
            weight=self.alpha,
            label_smoothing=self.smoothing,
            reduction='none'
        )(inputs, targets)
        pt = torch.exp(-ce_loss)
        return ((1 - pt) ** self.gamma * ce_loss).mean()

In [None]:
csv_path = "/kaggle/input/data8888/f2.csv"
df = pd.read_csv(csv_path)
label_mapping = {"سري للغاية": 0, "سري": 1, "مقيد": 2, "عام": 3}
df["labels"] = df["رأي اللجنة"].map(label_mapping)
df = df.dropna(subset=["labels"]).reset_index(drop=True)

text_columns = [
    'توضيح الكلمات الرئيسية أو المعرفات الفريدة من نوعها التي تعرف تلك البيانات',
    'وصف العملية التي تقوم بها الإدارة',
    'نوع التأثير المتوقع (مالي، السمعة، الصحة، السلامة، تشغيلي، أمني، العلاقة مع الأطراف المعنية)',
    'نوع البيانات الشخصية مثال: ("المعلومات الصحية"، "العنوان الوطني"،"معلومات الاتصال")',
    'النماذج المرتبطة بالعملية'
]

SEC_TOKEN = "[SEC]"
df['combined_text'] = (
    f"{SEC_TOKEN} " + df[text_columns[0]].astype(str) + " " +
    df[text_columns].astype(str).agg(' '.join, axis=1)
)

In [None]:
encoder = OneHotEncoder(sparse_output=False)
encoder.fit(df[['الإدارة']])
admin_encoded = pd.DataFrame(
    encoder.transform(df[['الإدارة']]),
    columns=encoder.get_feature_names_out(['الإدارة'])
)
df = pd.concat([df.reset_index(drop=True), admin_encoded], axis=1)

In [None]:
for class_id in [0, 1]:
    samples = df[df['labels'] == class_id]
    df = pd.concat([df, samples.sample(300, replace=True, random_state=42)], axis=0)
df = df.reset_index(drop=True)

In [None]:
train_val_df, test_df = train_test_split(df, test_size=0.15, stratify=df['labels'], random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.15, stratify=train_val_df['labels'], random_state=42)

In [None]:
model_name = "aubmindlab/bert-base-arabertv02"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.add_special_tokens({'additional_special_tokens': [SEC_TOKEN]})

def tokenize_function(examples):
    tokenized = tokenizer(
        examples["combined_text"],
        padding="max_length",
        truncation=True,
        max_length=512,
        return_token_type_ids=False
    )
    for col in encoder.get_feature_names_out(['الإدارة']):
        tokenized[col] = examples[col]
    return tokenized

dataset_cols = ['combined_text', 'labels'] + list(encoder.get_feature_names_out(['الإدارة']))
train_dataset = Dataset.from_pandas(train_df[dataset_cols], preserve_index=False).map(tokenize_function, batched=True)
val_dataset = Dataset.from_pandas(val_df[dataset_cols], preserve_index=False).map(tokenize_function, batched=True)
test_dataset = Dataset.from_pandas(test_df[dataset_cols], preserve_index=False).map(tokenize_function, batched=True)

columns_to_keep = ["input_ids", "attention_mask", "labels"] + list(encoder.get_feature_names_out(['الإدارة']))
train_dataset = train_dataset.remove_columns([c for c in train_dataset.column_names if c not in columns_to_keep])
val_dataset = val_dataset.remove_columns([c for c in val_dataset.column_names if c not in columns_to_keep])
test_dataset = test_dataset.remove_columns([c for c in test_dataset.column_names if c not in columns_to_keep])

In [None]:
class_weights = compute_class_weight("balanced", classes=np.unique(df['labels']), y=df['labels'])
class_weights = torch.tensor(class_weights, dtype=torch.float).to("cuda")

In [None]:
class AdminAwareBERT(torch.nn.Module):
    def __init__(self, model_name, num_labels, class_weights, admin_feature_dim):
        super().__init__()
        base_model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        self.bert = get_peft_model(base_model, LoraConfig(
            r=8, lora_alpha=16, target_modules=["query", "key", "value"],
            lora_dropout=0.2, bias="none", task_type="SEQ_CLS"
        ))
        self.bert.resize_token_embeddings(len(tokenizer))
        self.admin_processor = torch.nn.Sequential(
            torch.nn.Linear(admin_feature_dim, 32),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.3)
        )
        self.classifier = torch.nn.Linear(base_model.config.hidden_size * 2 + 32, num_labels)
        self.loss_fn = BalancedFocalLoss(alpha=class_weights, gamma=2, smoothing=0.0)
        self.dropout = torch.nn.Dropout(0.3)

    def forward(self, input_ids, attention_mask, labels=None, **admin_features):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True)
        cls_emb = outputs.hidden_states[-1][:, 0, :]
        mean_emb = torch.mean(outputs.hidden_states[-1], dim=1)
        text_emb = torch.cat([cls_emb, mean_emb], dim=1)
        admin_tensor = torch.stack([admin_features[col].float() for col in encoder.get_feature_names_out(['الإدارة'])], dim=1)
        admin_processed = self.admin_processor(admin_tensor)
        combined = torch.cat([text_emb, admin_processed], dim=1)
        logits = self.classifier(self.dropout(combined))
        if labels is not None:
            loss = self.loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="steps",
    logging_steps=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=45,
    learning_rate=4e-5,
    weight_decay=0.01,
    warmup_ratio=0.1,
    max_grad_norm=1.0,
    metric_for_best_model="eval_f1",
    greater_is_better=True,
    fp16=True,
    gradient_accumulation_steps=2,
    save_total_limit=3,
    remove_unused_columns=False,
    report_to="none",
    load_best_model_at_end=True
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    f1 = evaluate.load("f1")
    macro = f1.compute(predictions=preds, references=labels, average="weighted")["f1"]
    per_class = f1.compute(predictions=preds, references=labels, average=None)["f1"]
    return {"eval_f1": macro, **{f"eval_f1_class_{i}": v for i, v in enumerate(per_class)}}

class VerboseCallback(TrainerCallback):
    def on_log(self, args, state, control, logs=None, **kwargs):
        if logs:
            print(f"Step {state.global_step} - Loss: {logs.get('loss', 0):.4f}")

In [None]:
model = AdminAwareBERT(model_name, 4, class_weights, len(encoder.get_feature_names_out(['الإدارة'])))
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=5), VerboseCallback()]
)

print("\nStart Training with [SEC] token...")
trainer.train()