In [1]:
!pip install transformers torch sentence-transformers Sastrawi scikit-learn faiss-cpu rank_bm25 matplotlib

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import pandas as pd
import numpy as np
import json
import os
import re
import sys
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, log_loss
from typing import List, Tuple, Dict
from collections import Counter
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
from Sastrawi.StopWordRemover.StopWordRemoverFactory import StopWordRemoverFactory
from transformers import AutoTokenizer, AutoModel
from rank_bm25 import BM25Okapi
import torch
import faiss
from scipy.stats import rankdata

In [3]:
# === 1. LOAD DATA ===
file_path = 'data/processed/cases.csv'
try:
    df = pd.read_csv(file_path)
except FileNotFoundError:
    print(f"Error: File {file_path} not found.")
    sys.exit(1)

# Ensure case_id is unique
if df["case_id"].duplicated().any():
    print("Error: Duplicate case_id found in dataset.")
    sys.exit(1)

# Check for required columns
required_columns = ['case_id', 'text_full', 'pasal', 'pidana_penjara']
if not all(col in df for col in required_columns):
    print(f"Error: Dataset missing required columns: {', '.join(set(required_columns) - set(df.columns))}")
    sys.exit(1)

In [4]:
# === 2. EXTRAK SOLUSI ===
def normalize_solution(text: str) -> str:
    """Normalize solution text to standardize penalties (e.g., 'penjara 2 tahun')."""
    if not isinstance(text, str) or not text.strip():
        return "Unknown"
    text = text.lower().strip()
    match = re.search(r'(penjara|denda)?\s*(\d+\.?\d*)\s*(tahun|bulan|juta)?', text)
    if match:
        penalty_type = match.group(1) if match.group(1) else "penjara"
        value = match.group(2)
        unit = match.group(3) if match.group(3) else ""
        return f"{penalty_type} {value} {unit}".strip()
    return text

# Create case_solutions dictionary
case_solutions = {
    row['case_id']: normalize_solution(str(row['pidana_penjara']))
    for _, row in df.iterrows()
    if pd.notna(row['pidana_penjara']) and str(row['pidana_penjara']).strip()
}
if len(case_solutions) < len(df):
    print(f"Warning: {len(df) - len(case_solutions)} cases with missing or empty pidana_penjara skipped.")

if not case_solutions:
    print("Error: No valid solutions found in dataset.")
    sys.exit(1)

In [5]:
# === 3. PREPROCESSING ===
stopword_factory = StopWordRemoverFactory()
stemmer = StemmerFactory().create_stemmer()
stop_words_indonesia = stopword_factory.get_stop_words() + ["terdakwa", "korban", "menyatakan", "secara", "sah", "meyakinkan"]

synonyms = {
    "pengeroyokan": ["kekerasan bersama-sama", "penganiayaan bersama-sama"],
    "penganiayaan": ["kekerasan", "penyerangan"],
    "turut serta": ["ikut serta", "bersama-sama"],
    "luka berat": ["cedera parah", "luka serius"]
}

def preprocess(text: str) -> str:
    """Preprocess text: lowercase, remove punctuation, stem, apply synonyms, remove stopwords."""
    if not isinstance(text, str):
        text = ""
    text = text.lower()
    text = re.sub(r'[^a-z0-9\s]', ' ', text)
    text = stemmer.stem(text)
    for key, syn_list in synonyms.items():
        for syn in syn_list:
            text = text.replace(syn, key)
    text = ' '.join([word for word in text.split() if word not in stop_words_indonesia])
    return text

corpus = df["text_full"].apply(preprocess)

In [6]:
# === 4. REPRESENTASI VEKTOR ===
# TF-IDF
tfidf_vectorizer = TfidfVectorizer(stop_words=stop_words_indonesia, max_features=15000, sublinear_tf=True, ngram_range=(1, 3))
tfidf_matrix = tfidf_vectorizer.fit_transform(corpus)

# BM25
tokenized_corpus = [doc.split() for doc in corpus]
bm25 = BM25Okapi(tokenized_corpus)

# IndoBERT Embeddings
tokenizer = AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p2")
model = AutoModel.from_pretrained("./indobert_finetuned" if os.path.exists("./indobert_finetuned") else "indobenchmark/indobert-base-p2")

def bert_embed(text: str) -> np.ndarray:
    """Generate IndoBERT embeddings for text."""
    try:
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
        embeddings = outputs.last_hidden_state.max(dim=1).values.squeeze().numpy()
        return embeddings / np.linalg.norm(embeddings) if np.linalg.norm(embeddings) != 0 else np.zeros(768)
    except Exception as e:
        print(f"Error embedding text: {e}")
        return np.zeros(768)

# Load or compute IndoBERT embeddings
embeddings_file = 'data/processed/embeddings.json'
if os.path.exists(embeddings_file):
    with open(embeddings_file, 'r', encoding='utf-8') as f:
        embeddings_data = json.load(f)
    bert_embeddings = np.array([data["indobert_embedding"] for data in embeddings_data])
else:
    bert_embeddings = np.array([bert_embed(text) for text in corpus])

In [7]:
# === 5. SPLITTING DATA ===
nb_trained = True
stratify_col = df["pasal"]
X_train_tfidf, X_test_tfidf, y_train, y_test = train_test_split(
    tfidf_matrix, df["pasal"], test_size=0.2, random_state=42, stratify=stratify_col
)
X_train_bert, X_test_bert = train_test_split(
    bert_embeddings, test_size=0.2, random_state=42, stratify=stratify_col
)
case_id_train, case_id_test = train_test_split(
    df["case_id"], test_size=0.2, random_state=42, stratify=stratify_col
)
text_test = df.iloc[case_id_test.index]["text_full"].apply(preprocess)
pidana_test = df.iloc[case_id_test.index]["pidana_penjara"].apply(normalize_solution)

In [8]:
# === 6. MODEL RETRIEVAL: NAIVE BAYES ===
nb = MultinomialNB()
nb.fit(X_train_tfidf, y_train)

# Evaluate Naive Bayes
y_pred = nb.predict(X_test_tfidf)
accuracy = accuracy_score(y_test, y_pred)
prob_scores = nb.predict_proba(X_test_tfidf)
class_labels = np.unique(y_train)
log_losses = []
for i, y_true in enumerate(y_test):
    if y_true not in class_labels:
        print(f"Warning: Test pasal {y_true} not in training set. Skipping.")
        continue
    true_class_idx = np.where(class_labels == y_true)[0][0]
    true_prob = prob_scores[i, true_class_idx]
    loss = -np.log(max(true_prob, 1e-15))
    log_losses.append(loss)
log_loss_value = np.mean(log_losses) if log_losses else 0.0
print(f"Naive Bayes Accuracy: {accuracy:.4f}")
print(f"Naive Bayes Log Loss: {log_loss_value:.4f}")

Naive Bayes Accuracy: 1.0000
Naive Bayes Log Loss: 0.0339


In [9]:
# === 7. FAISS INDEX FOR INDOBERT RETRIEVAL ===
dimension = bert_embeddings.shape[1]
index = faiss.IndexFlatIP(dimension)
index.add(bert_embeddings.astype(np.float32))

In [10]:
# === 8. FUNGSI RETRIEVAL ===
def retrieve(query: str, k: int = 5, method: str = "nb") -> List[Tuple[int, float]]:
    """Retrieve top-k cases for a query using the specified method."""
    query_clean = preprocess(query)
    try:
        if method == "bm25":
            query_tokens = query_clean.split()
            scores = bm25.get_scores(query_tokens)
            top_k_idx = np.argsort(scores)[::-1][:k]
            scores = scores[top_k_idx]
        elif method == "tfidf":
            query_vec = tfidf_vectorizer.transform([query_clean])
            scores = cosine_similarity(query_vec, tfidf_matrix).flatten()
            top_k_idx = scores.argsort()[::-1][:k]
            scores = scores[top_k_idx]
        elif method == "bert":
            query_vec = bert_embed(query_clean).astype(np.float32)
            query_vec = query_vec / np.linalg.norm(query_vec) if np.linalg.norm(query_vec) != 0 else np.zeros(dimension, dtype=np.float32)
            scores, indices = index.search(query_vec.reshape(1, -1), k)
            top_k_idx = indices[0]
            scores = scores[0]
        elif method == "nb":
            query_vec = tfidf_vectorizer.transform([query_clean])
            predicted_pasal = nb.predict(query_vec)[0]
            candidate_idx = df[df["pasal"] == predicted_pasal].index
            if len(candidate_idx) == 0:
                print(f"Warning: No cases found for predicted pasal {predicted_pasal}. Falling back to TF-IDF.")
                scores = cosine_similarity(query_vec, tfidf_matrix).flatten()
                top_k_idx = scores.argsort()[::-1][:k]
                scores = scores[top_k_idx]
            else:
                candidate_matrix = tfidf_matrix[candidate_idx]
                scores = cosine_similarity(query_vec, candidate_matrix).flatten()
                top_k_idx = candidate_idx[scores.argsort()[::-1][:min(k, len(scores))]]
                scores = scores[scores.argsort()[::-1][:min(k, len(scores))]]
        else:
            raise ValueError("Invalid method. Choose 'bm25', 'tfidf', 'bert', or 'nb'.")
        top_case_ids = df.iloc[top_k_idx]["case_id"].tolist()
        return list(zip(top_case_ids, scores))
    except Exception as e:
        print(f"Error in retrieval for query '{query}' with method '{method}': {e}")
        return []

In [11]:
# === 9. PREDIKSI OUTCOME ===
def predict_outcome(query: str, k: int = 5, method: str = "nb", strategy: str = "weighted") -> str:
    """Predict the outcome for a query based on top-k similar cases."""
    top_k = retrieve(query, k=k, method=method)
    if not top_k:
        return "No similar cases found."
    case_ids, scores = zip(*top_k)
    solutions = [case_solutions[cid] for cid in case_ids if cid in case_solutions]
    if not solutions:
        return "No solutions found for retrieved cases."
    if strategy == "majority":
        solution_counts = Counter(solutions)
        predicted_solution = solution_counts.most_common(1)[0][0]
    else:  # Weighted similarity
        solution_scores = {}
        for sol, score in zip(solutions, scores):
            solution_scores[sol] = solution_scores.get(sol, 0.0) + score
        predicted_solution = max(solution_scores, key=solution_scores.get)
    return predicted_solution

In [12]:
# === 10. EVALUASI RETRIEVAL ===
def eval_retrieval(queries: List[str], ground_truth: List[str], k: int = 5, method: str = "nb") -> Dict[str, float]:
    """Evaluate retrieval performance for a list of queries."""
    y_true = []
    y_pred = []
    failure_cases = []

    for query, gt_pasal in zip(queries, ground_truth):
        top_k = retrieve(query, k=k, method=method)
        if not top_k:
            y_true.append(1)  # Assume relevant
            y_pred.append(0)  # No cases retrieved
            failure_cases.append({"query": query, "ground_truth": gt_pasal, "top_k_case_ids": [], "reason": "No cases retrieved"})
            continue

        case_ids, _ = zip(*top_k)
        # Check if retrieved cases match ground-truth pasal
        retrieved_pasals = [df[df["case_id"] == cid]["pasal"].iloc[0] for cid in case_ids if cid in df["case_id"].values]
        relevant = [1 if pasal == gt_pasal else 0 for pasal in retrieved_pasals]

        # If no relevant cases, log failure
        if sum(relevant) == 0:
            failure_cases.append({
                "query": query,
                "ground_truth": gt_pasal,
                "top_k_case_ids": list(case_ids),
                "reason": "No relevant cases retrieved"
            })

        # For accuracy, consider retrieval correct if at least one case is relevant
        y_true.append(1)  # Query has relevant cases in dataset
        y_pred.append(1 if any(relevant) else 0)

    # Compute metrics
    metrics = {
        "accuracy": accuracy_score(y_true, y_pred) if y_true else 0.0,
        "precision": precision_score(y_true, y_pred, zero_division=0),
        "recall": recall_score(y_true, y_pred, zero_division=0),
        "f1_score": f1_score(y_true, y_pred, zero_division=0)
    }

    # Save failure cases
    with open(f"data/eval/failure_cases_retrieval_{method}.json", "w", encoding="utf-8") as f:
        json.dump(failure_cases, f, ensure_ascii=False, indent=2)

    return metrics

In [13]:
# === 11. EVALUASI PREDICTION ===
def eval_prediction(queries: List[str], ground_truth: List[str], k: int = 5, method: str = "nb", strategy: str = "weighted") -> Dict[str, float]:
    """Evaluate prediction performance for a list of queries."""
    y_true = []
    y_pred = []
    failure_cases = []

    for query, gt_solution in zip(queries, ground_truth):
        if gt_solution == "Unknown":
            continue  # Skip queries with invalid ground-truth
        predicted_solution = predict_outcome(query, k=k, method=method, strategy=strategy)
        if predicted_solution in ["No similar cases found", "No solutions found for retrieved cases"]:
            y_true.append(1)  # Assume relevant
            y_pred.append(0)  # Prediction failed
            failure_cases.append({
                "query": query,
                "ground_truth": gt_solution,
                "predicted": predicted_solution,
                "reason": predicted_solution
            })
            continue

        y_true.append(1)  # Query has a valid ground-truth
        y_pred.append(1 if predicted_solution == gt_solution else 0)
        if predicted_solution != gt_solution:
            failure_cases.append({
                "query": query,
                "ground_truth": gt_solution,
                "predicted": predicted_solution,
                "reason": "Prediction mismatch"
            })

    # Compute metrics
    metrics = {
        "accuracy": accuracy_score(y_true, y_pred) if y_true else 0.0,
        "precision": precision_score(y_true, y_pred, zero_division=0),
        "recall": recall_score(y_true, y_pred, zero_division=0),
        "f1_score": f1_score(y_true, y_pred, zero_division=0)
    }

    # Save failure cases
    with open(f"data/eval/failure_cases_prediction_{method}_{strategy}.json", "w", encoding="utf-8") as f:
        json.dump(failure_cases, f, ensure_ascii=False, indent=2)

    return metrics

In [14]:
# === 12. VISUALISASI & LAPORAN ===
def create_metrics_table(retrieval_metrics: Dict[str, Dict], prediction_metrics: Dict[str, Dict]) -> pd.DataFrame:
    """Create a table of metrics for all models."""
    data = []
    for method in retrieval_metrics:
        metrics = retrieval_metrics[method]
        data.append({
            "Model": method,
            "Type": "Retrieval",
            "Accuracy": metrics["accuracy"],
            "Precision": metrics["precision"],
            "Recall": metrics["recall"],
            "F1-Score": metrics["f1_score"]
        })
    for key, metrics in prediction_metrics.items():
        method, strategy = key.split("_")
        data.append({
            "Model": f"{method}_{strategy}",
            "Type": "Prediction",
            "Accuracy": metrics["accuracy"],
            "Precision": metrics["precision"],
            "Recall": metrics["recall"],
            "F1-Score": metrics["f1_score"]
        })
    return pd.DataFrame(data)

def plot_metrics(metrics_df: pd.DataFrame):
    """Plot bar chart of F1-scores for retrieval and prediction."""
    plt.figure(figsize=(10, 6))
    models = metrics_df["Model"]
    f1_scores = metrics_df["F1-Score"]
    colors = ["blue" if t == "Retrieval" else "green" for t in metrics_df["Type"]]

    plt.bar(models, f1_scores, color=colors)
    plt.xlabel("Model")
    plt.ylabel("F1-Score")
    plt.title("F1-Score Comparison for Retrieval and Prediction")
    plt.xticks(rotation=45, ha="right")
    plt.tight_layout()

    os.makedirs("data/eval", exist_ok=True)
    plt.savefig("data/eval/f1_score_comparison.png")
    plt.close()

In [15]:
# === 13. MAIN EVALUATION ===
# Prepare test queries and ground-truth
queries = text_test.tolist()
pasal_ground_truth = y_test.tolist()
solution_ground_truth = pidana_test.tolist()

# Evaluate retrieval for each method
retrieval_methods = ["nb", "tfidf", "bert", "bm25"]
k = 5
retrieval_metrics = {}
for method in retrieval_methods:
    metrics = eval_retrieval(queries, pasal_ground_truth, k=k, method=method)
    retrieval_metrics[method] = metrics
    print(f"Retrieval Metrics for {method}:")
    print(f"Accuracy: {metrics['accuracy']:.4f}")
    print(f"Precision: {metrics['precision']:.4f}")
    print(f"Recall: {metrics['recall']:.4f}")
    print(f"F1-Score: {metrics['f1_score']:.4f}")
    print("---")

# Evaluate prediction for each method and strategy
prediction_configs = [(method, strategy) for method in retrieval_methods for strategy in ["weighted", "majority"]]
prediction_metrics = {}
for method, strategy in prediction_configs:
    metrics = eval_prediction(queries, solution_ground_truth, k=k, method=method, strategy=strategy)
    prediction_metrics[f"{method}_{strategy}"] = metrics
    print(f"Prediction Metrics for {method} ({strategy}):")
    print(f"Accuracy: {metrics['accuracy']:.4f}")
    print(f"Precision: {metrics['precision']:.4f}")
    print(f"Recall: {metrics['recall']:.4f}")
    print(f"F1-Score: {metrics['f1_score']:.4f}")
    print("---")

# Create and save metrics table
metrics_df = create_metrics_table(retrieval_metrics, prediction_metrics)
os.makedirs("data/eval", exist_ok=True)
metrics_df.to_csv("data/eval/retrieval_metrics.csv" if metrics_df["Type"].str.contains("Retrieval").any() else "data/eval/prediction_metrics.csv", index=False)
print("Metrics saved to data/eval/retrieval_metrics.csv and data/eval/prediction_metrics.csv")

# Plot metrics
plot_metrics(metrics_df)
print("F1-Score comparison plot saved to data/eval/f1_score_comparison.png")

Retrieval Metrics for nb:
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1-Score: 1.0000
---
Retrieval Metrics for tfidf:
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1-Score: 1.0000
---
Retrieval Metrics for bert:
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1-Score: 1.0000
---
Retrieval Metrics for bm25:
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1-Score: 1.0000
---
Prediction Metrics for nb (weighted):
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1-Score: 1.0000
---
Prediction Metrics for nb (majority):
Accuracy: 0.6667
Precision: 1.0000
Recall: 0.6667
F1-Score: 0.8000
---
Prediction Metrics for tfidf (weighted):
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1-Score: 1.0000
---
Prediction Metrics for tfidf (majority):
Accuracy: 0.6667
Precision: 1.0000
Recall: 0.6667
F1-Score: 0.8000
---
Prediction Metrics for bert (weighted):
Accuracy: 0.8889
Precision: 1.0000
Recall: 0.8889
F1-Score: 0.9412
---
Prediction Metrics for bert (majority):
Accuracy: 0.