In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel
import torch
import json
from typing import List, Dict
import re
import os

# Load and preprocess data
def load_and_preprocess_data(file_path: str) -> pd.DataFrame:
    df = pd.read_csv(file_path)
    df['combined_text'] = df['case_summary'].fillna('') + ' ' + df['full_text'].fillna('')
    def clean_text(text: str) -> str:
        # Preserve legal terms by limiting punctuation removal
        text = re.sub(r'[\n\t]', ' ', text.lower())  # Remove newlines and tabs only
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    df['combined_text'] = df['combined_text'].apply(clean_text)
    df['solution'] = df['legal_basis'].fillna(df['case_summary'])
    return df

# TF-IDF Vectorization
def get_tfidf_vectors(texts: List[str]) -> np.ndarray:
    vectorizer = TfidfVectorizer(max_features=5000, stop_words=None)
    tfidf_matrix = vectorizer.fit_transform(texts)
    return vectorizer, tfidf_matrix

# BERT Embeddings using IndoBERT
def get_bert_embeddings(texts: List[str], model_name: str = "indobenchmark/indobert-base-p1") -> tuple:
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    embeddings = []
    for text in texts:
        inputs = tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding=True)
        with torch.no_grad():
            outputs = model(**inputs)
        embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
        embeddings.append(embedding)
    return np.array(embeddings), tokenizer, model

# Retrieval function
def retrieve(query: str, df: pd.DataFrame, tfidf_vectorizer, tfidf_matrix: np.ndarray,
             bert_embeddings: np.ndarray, bert_tokenizer, bert_model, k: int = 5, use_bert: bool = True) -> List[int]:
    query_clean = re.sub(r'[\n\t]', ' ', query.lower()).strip()
    query_clean = re.sub(r'\s+', ' ', query_clean)
    if use_bert:
        inputs = bert_tokenizer(query_clean, return_tensors="pt", max_length=512, truncation=True, padding=True)
        with torch.no_grad():
            query_vector = bert_model(**inputs).last_hidden_state.mean(dim=1).squeeze().numpy()
        query_vector = query_vector.reshape(1, -1)
        similarities = cosine_similarity(query_vector, bert_embeddings)[0]
    else:
        query_vector = tfidf_vectorizer.transform([query_clean]).toarray()
        similarities = cosine_similarity(query_vector, tfidf_matrix)[0]
    top_k_indices = np.argsort(similarities)[-k:][::-1]
    top_k_case_ids = df.iloc[top_k_indices]['case_id'].tolist()
    top_k_similarities = similarities[top_k_indices]
    return top_k_case_ids, top_k_similarities

# Predict outcome function
def predict_outcome(query: str, df: pd.DataFrame, tfidf_vectorizer, tfidf_matrix: np.ndarray,
                    bert_embeddings: np.ndarray, bert_tokenizer, bert_model, k: int = 5) -> tuple:
    top_k_case_ids, top_k_similarities = retrieve(query, df, tfidf_vectorizer, tfidf_matrix,
                                                 bert_embeddings, bert_tokenizer, bert_model, k)
    solutions = [df[df['case_id'] == cid]['solution'].iloc[0] for cid in top_k_case_ids]
    solution_scores = {}
    for sol, sim in zip(solutions, top_k_similarities):
        if sol in solution_scores:
            solution_scores[sol] += sim
        else:
            solution_scores[sol] = sim
    predicted_solution = max(solution_scores, key=solution_scores.get)
    return predicted_solution, top_k_case_ids

# Evaluate retrieval performance
def eval_retrieval(queries: List[Dict], df: pd.DataFrame, tfidf_vectorizer, tfidf_matrix: np.ndarray,
                   bert_embeddings: np.ndarray, bert_tokenizer, bert_model, k: int = 5) -> Dict:
    metrics = {'tfidf': {'accuracy': [], 'precision': [], 'recall': [], 'f1': []},
               'bert': {'accuracy': [], 'precision': [], 'recall': [], 'f1': []}}
    error_cases = []

    for query_info in queries:
        query = query_info['query']
        ground_truth_id = query_info['ground_truth']

        # Validate ground truth ID exists in dataset
        if ground_truth_id not in df['case_id'].values:
            print(f"Warning: Ground truth ID {ground_truth_id} not found in dataset for query: {query}")
            continue

        # TF-IDF Retrieval
        tfidf_case_ids, tfidf_similarities = retrieve(query, df, tfidf_vectorizer, tfidf_matrix,
                                                     bert_embeddings, bert_tokenizer, bert_model, k, use_bert=False)
        tfidf_y_true = [1 if ground_truth_id == cid else 0 for cid in tfidf_case_ids]
        tfidf_y_pred = [1 if cid in tfidf_case_ids else 0 for cid in [ground_truth_id] * k]
        metrics['tfidf']['accuracy'].append(1 if ground_truth_id in tfidf_case_ids else 0)
        metrics['tfidf']['precision'].append(precision_score([1], [1 if ground_truth_id in tfidf_case_ids else 0], zero_division=0))
        metrics['tfidf']['recall'].append(recall_score([1], [1 if ground_truth_id in tfidf_case_ids else 0], zero_division=0))
        metrics['tfidf']['f1'].append(f1_score([1], [1 if ground_truth_id in tfidf_case_ids else 0], zero_division=0))

        # BERT Retrieval
        bert_case_ids, bert_similarities = retrieve(query, df, tfidf_vectorizer, tfidf_matrix,
                                                   bert_embeddings, bert_tokenizer, bert_model, k, use_bert=True)
        bert_y_true = [1 if ground_truth_id == cid else 0 for cid in bert_case_ids]
        bert_y_pred = [1 if cid in bert_case_ids else 0 for cid in [ground_truth_id] * k]
        metrics['bert']['accuracy'].append(1 if ground_truth_id in bert_case_ids else 0)
        metrics['bert']['precision'].append(precision_score([1], [1 if ground_truth_id in bert_case_ids else 0], zero_division=0))
        metrics['bert']['recall'].append(recall_score([1], [1 if ground_truth_id in bert_case_ids else 0], zero_division=0))
        metrics['bert']['f1'].append(f1_score([1], [1 if ground_truth_id in bert_case_ids else 0], zero_division=0))

        # Error analysis with similarity scores
        if ground_truth_id not in tfidf_case_ids or ground_truth_id not in bert_case_ids:
            error_cases.append({
                'query': query,
                'ground_truth_id': ground_truth_id,
                'tfidf_case_ids': tfidf_case_ids,
                'tfidf_similarities': tfidf_similarities.tolist(),
                'bert_case_ids': bert_case_ids,
                'bert_similarities': bert_similarities.tolist()
            })

    # Average metrics
    for model in metrics:
        for metric in metrics[model]:
            metrics[model][metric] = np.mean(metrics[model][metric]) if metrics[model][metric] else 0.0

    return metrics, error_cases

# Evaluate prediction performance
def eval_prediction(queries: List[Dict], df: pd.DataFrame, tfidf_vectorizer, tfidf_matrix: np.ndarray,
                    bert_embeddings: np.ndarray, bert_tokenizer, bert_model, k: int = 5) -> Dict:
    prediction_metrics = []
    prediction_errors = []

    for query_info in queries:
        query = query_info['query']
        ground_truth_solution = query_info['ground_truth_solution']
        predicted_solution, top_k_case_ids = predict_outcome(query, df, tfidf_vectorizer, tfidf_matrix,
                                                            bert_embeddings, bert_tokenizer, bert_model, k)

        # Case-insensitive exact match for prediction
        is_correct = 1 if predicted_solution.strip().lower() == ground_truth_solution.strip().lower() else 0
        prediction_metrics.append({
            'query_id': query_info['query_id'],
            'accuracy': is_correct,
            'predicted_solution': predicted_solution,
            'ground_truth_solution': ground_truth_solution,
            'top_5_case_ids': top_k_case_ids
        })

        if not is_correct:
            prediction_errors.append({
                'query': query,
                'predicted_solution': predicted_solution,
                'ground_truth_solution': ground_truth_solution,
                'top_5_case_ids': top_k_case_ids
            })

    avg_accuracy = np.mean([m['accuracy'] for m in prediction_metrics])
    return prediction_metrics, avg_accuracy, prediction_errors

# Main execution
def main():
    # Load data
    file_path = "/content/drive/MyDrive/Tugasbesar/data/processed/cases.csv"
    df = load_and_preprocess_data(file_path)

    # Get TF-IDF and BERT vectors
    tfidf_vectorizer, tfidf_matrix = get_tfidf_vectors(df['combined_text'].tolist())
    bert_embeddings, bert_tokenizer, bert_model = get_bert_embeddings(df['combined_text'].tolist())

    # Load queries
    demo_queries = [
        {"query_id": 1, "query": "Sengketa perdata mengenai hutang piutang dengan penyelesaian damai", "ground_truth": 1, "ground_truth_solution": "Para pihak dihukum untuk mentaati akta perdamaian"},
        {"query_id": 2, "query": "Wanprestasi pembayaran atas perjanjian investasi", "ground_truth": 2, "ground_truth_solution": "Tergugat dinyatakan wanprestasi dan wajib membayar hutang"},
        {"query_id": 3, "query": "Gugatan perdata dengan bukti transfer dan pengakuan hutang", "ground_truth": 3, "ground_truth_solution": "Tergugat wajib membayar hutang pokok beserta bunga"},
        {"query_id": 4, "query": "Pencabutan gugatan setelah mediasi tidak berhasil", "ground_truth": 127, "ground_truth_solution": "Perkara dicabut dari register"},
        {"query_id": 5, "query": "Sengketa perdata dengan bukti setoran bank", "ground_truth": 3, "ground_truth_solution": "Tergugat dinyatakan wanprestasi dan wajib membayar hutang"}
    ]

    # Evaluate retrieval
    retrieval_metrics, error_cases = eval_retrieval(demo_queries, df, tfidf_vectorizer, tfidf_matrix,
                                                   bert_embeddings, bert_tokenizer, bert_model, k=5)

    # Evaluate prediction
    prediction_metrics, avg_prediction_accuracy, prediction_errors = eval_prediction(demo_queries, df, tfidf_vectorizer,
                                                                                   tfidf_matrix, bert_embeddings,
                                                                                   bert_tokenizer, bert_model, k=5)

    # Save retrieval metrics
    retrieval_metrics_df = pd.DataFrame({
        'Model': ['TF-IDF', 'BERT'],
        'Accuracy': [retrieval_metrics['tfidf']['accuracy'], retrieval_metrics['bert']['accuracy']],
        'Precision': [retrieval_metrics['tfidf']['precision'], retrieval_metrics['bert']['precision']],
        'Recall': [retrieval_metrics['tfidf']['recall'], retrieval_metrics['bert']['recall']],
        'F1': [retrieval_metrics['tfidf']['f1'], retrieval_metrics['bert']['f1']]
    })
    retrieval_metrics_path = "/content/drive/MyDrive/Tugasbesar/data/eval/retrieval_metrics.csv"
    os.makedirs(os.path.dirname(retrieval_metrics_path), exist_ok=True)
    retrieval_metrics_df.to_csv(retrieval_metrics_path, index=False)
    print(f"Retrieval metrics saved to {retrieval_metrics_path}")
    print("\nRetrieval Metrics Table:")
    print(retrieval_metrics_df)

    # Save prediction metrics
    prediction_metrics_df = pd.DataFrame(prediction_metrics)
    prediction_metrics_path = "/content/drive/MyDrive/Tugasbesar/data/eval/prediction_metrics.csv"
    prediction_metrics_df.to_csv(prediction_metrics_path, index=False)
    print(f"\nPrediction metrics saved to {prediction_metrics_path}")
    print(f"Average Prediction Accuracy: {avg_prediction_accuracy:.4f}")

    # Error analysis
    print("\nError Analysis - Retrieval Failures:")
    for error in error_cases:
        print(f"Query: {error['query']}")
        print(f"Ground Truth ID: {error['ground_truth_id']}")
        print(f"TF-IDF Retrieved: {error['tfidf_case_ids']} (Similarities: {error['tfidf_similarities']})")
        print(f"BERT Retrieved: {error['bert_case_ids']} (Similarities: {error['bert_similarities']})")
        print()

    print("\nError Analysis - Prediction Failures:")
    for error in prediction_errors:
        print(f"Query: {error['query']}")
        print(f"Predicted Solution: {error['predicted_solution']}")
        print(f"Ground Truth Solution: {error['ground_truth_solution']}")
        print(f"Top-5 Case IDs: {error['top_5_case_ids']}")
        print()
   
if __name__ == "__main__":
    main()

Retrieval Metrics Table:
/content/drive/MyDrive/Tugasbesar/data/eval/retrieval_metrics.csv

     Model   Accuracy   Precision  Recall    F1
0  TF-IDF       0.80       0.80      0.80    0.80
1    BERT       0.80       0.66      0.66    0.66

Prediction metrics saved to 
/content/drive/MyDrive/Tugasbesar/data/eval/prediction_metrics.csv
Average Prediction Accuracy: 0.0000

Error Analysis - Retrieval Failures:
Query: Sengketa perdata mengenai hutang piutang dengan penyelesaian damai
Ground Truth ID: 1
TF-IDF Retrieved: [118, 123, 93, 53, 11] (Similarities: [0.10895861031516804, 0.08362106563953889, 0.07773682547362205, 0.07576645602701834, 0.07269386733096407])
BERT Retrieved: [117, 57, 67, 78, 13] (Similarities: [0.4797317385673523, 0.4768753945827484, 0.47631460428237915, 0.46995988488197327, 0.4691508412361145])

Query: Wanprestasi pembayaran atas perjanjian investasi
Ground Truth ID: 2
TF-IDF Retrieved: [68, 24, 80, 42, 125] (Similarities: [0.25851384150252066, 0.1678201642505457, 0.1