### **Tahap 3 Disusun Oleh** 👨‍🎓✒️
>**Nama: Fitra Romeo Winky**

>**NIM: 202210370311056**

>**Nama: Muhammad Aunul Hakim**

>**NIM: 202210370311073**

>**Kelas: C**

>**Mata Kuliah: Penalaran Komputer**

>**Jurusan: Informatika**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Tahap 3
import pandas as pd
import numpy as np
import json
import os
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import accuracy_score, classification_report
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

class CaseRetrievalSystem:
    def __init__(self, dataset_path):
        """
        Initialize Case Retrieval System

        Args:
            dataset_path (str): Path to the cases.csv dataset
        """
        self.dataset_path = dataset_path
        self.df = None
        self.tfidf_vectorizer = None
        self.tfidf_matrix = None
        self.bert_tokenizer = None
        self.bert_model = None
        self.bert_embeddings = None
        self.ml_model = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None

    def load_data(self):
        """Load and preprocess the dataset"""
        print("Loading dataset...")
        self.df = pd.read_csv(self.dataset_path)
        print(f"Dataset loaded: {len(self.df)} cases")

        # Updated text columns based on new attributes
        text_columns = ['ringkasan_fakta', 'argumen_hukum', 'pihak']

        # Create combined text from available columns
        combined_parts = []
        for col in text_columns:
            if col in self.df.columns:
                combined_parts.append(self.df[col].fillna('').astype(str))

        if combined_parts:
            self.df['combined_text'] = combined_parts[0]
            for part in combined_parts[1:]:
                self.df['combined_text'] = self.df['combined_text'] + ' ' + part
        else:
            self.df['combined_text'] = ''

        # Use text_full if available, otherwise use combined_text
        if 'text_full' in self.df.columns:
            self.df['retrieval_text'] = self.df['text_full'].fillna(self.df['combined_text'])
        else:
            self.df['retrieval_text'] = self.df['combined_text']

        # Ensure case_id exists (it's already in the new attributes)
        if 'case_id' not in self.df.columns:
            self.df['case_id'] = ['case_' + str(i).zfill(4) for i in range(len(self.df))]

        print("Data preprocessing completed")
        print(f"Available columns: {list(self.df.columns)}")
        return self.df

    def create_tfidf_representation(self, max_features=5000):
        """
        Create TF-IDF representation of the cases

        Args:
            max_features (int): Maximum number of features for TF-IDF
        """
        print("Creating TF-IDF representation...")

        self.tfidf_vectorizer = TfidfVectorizer(
            max_features=max_features,
            stop_words=None,  # You can add Indonesian stop words here
            ngram_range=(1, 2),
            min_df=2,
            max_df=0.8
        )

        self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.df['retrieval_text'])
        print(f"TF-IDF matrix shape: {self.tfidf_matrix.shape}")
        return self.tfidf_matrix

    def create_bert_representation(self, model_name='indobenchmark/indobert-base-p1'):
        """
        Create BERT embeddings for the cases

        Args:
            model_name (str): Pre-trained BERT model name
        """
        print(f"Loading BERT model: {model_name}")

        try:
            self.bert_tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.bert_model = AutoModel.from_pretrained(model_name)
            self.bert_model.eval()
        except:
            print("Failed to load IndoBERT, using multilingual BERT instead...")
            model_name = 'bert-base-multilingual-cased'
            self.bert_tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.bert_model = AutoModel.from_pretrained(model_name)
            self.bert_model.eval()

        print("Creating BERT embeddings...")
        embeddings = []

        for idx, text in enumerate(self.df['retrieval_text']):
            if idx % 10 == 0:
                print(f"Processing {idx+1}/{len(self.df)} cases...")

            # Truncate text to avoid memory issues
            text = str(text)[:512]

            # Tokenize and encode
            inputs = self.bert_tokenizer(
                text,
                return_tensors='pt',
                truncation=True,
                padding=True,
                max_length=512
            )

            with torch.no_grad():
                outputs = self.bert_model(**inputs)
                # Use CLS token embedding
                embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
                embeddings.append(embedding)

        self.bert_embeddings = np.array(embeddings)
        print(f"BERT embeddings shape: {self.bert_embeddings.shape}")
        return self.bert_embeddings

    def split_data(self, test_size=0.3, random_state=42):
        """
        Split data for training and testing

        Args:
            test_size (float): Proportion of test data
            random_state (int): Random seed
        """
        print(f"Splitting data with ratio {1-test_size:.1f}:{test_size:.1f}")

        # Use jenis_perkara as target for classification
        y = self.df['jenis_perkara'].fillna('unknown')

        # Check class distribution
        class_counts = y.value_counts()
        print(f"Class distribution:\n{class_counts}")

        # Use .values to get numpy array, then check all elements
        class_count_values = class_counts.values  # This is already a numpy array
        can_stratify = all(count >= 2 for count in class_count_values)

        # Check if stratification is possible
        if can_stratify and len(y.unique()) > 1:
            print("Using stratified split")
            stratify_param = y
        else:
            print("Using random split (stratification not possible due to class imbalance)")
            stratify_param = None

        # Split using TF-IDF features
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.tfidf_matrix.toarray(),
            y,
            test_size=test_size,
            random_state=random_state,
            stratify=stratify_param
        )

        print(f"Training set: {self.X_train.shape[0]} cases")
        print(f"Test set: {self.X_test.shape[0]} cases")
        print(f"Training set class distribution:\n{pd.Series(self.y_train).value_counts()}")
        print(f"Test set class distribution:\n{pd.Series(self.y_test).value_counts()}")

        return self.X_train, self.X_test, self.y_train, self.y_test

    def train_ml_model(self, model_type='svm'):
        """
        Train machine learning model for retrieval

        Args:
            model_type (str): 'svm' or 'naive_bayes'
        """
        print(f"Training {model_type.upper()} model...")

        # Check if we have enough classes and samples for training
        unique_classes = len(set(self.y_train))
        if unique_classes < 2:
            print("Warning: Only one class in training data. Skipping ML model training.")
            return None

        if model_type.lower() == 'svm':
            # Use different kernel for small datasets
            if len(self.y_train) < 100:
                self.ml_model = SVC(kernel='rbf', probability=True, random_state=42, C=1.0)
            else:
                self.ml_model = SVC(kernel='linear', probability=True, random_state=42)
        elif model_type.lower() == 'naive_bayes':
            self.ml_model = MultinomialNB(alpha=1.0)
        else:
            raise ValueError("model_type must be 'svm' or 'naive_bayes'")

        try:
            self.ml_model.fit(self.X_train, self.y_train)

            # Evaluate model
            train_pred = self.ml_model.predict(self.X_train)
            test_pred = self.ml_model.predict(self.X_test)

            print(f"Training Accuracy: {accuracy_score(self.y_train, train_pred):.4f}")
            print(f"Test Accuracy: {accuracy_score(self.y_test, test_pred):.4f}")

            # Show detailed classification report for test set
            print("\nDetailed Classification Report:")
            print(classification_report(self.y_test, test_pred, zero_division=0))

        except Exception as e:
            print(f"Error training model: {e}")
            print("Continuing without ML model...")
            self.ml_model = None

        return self.ml_model

    def retrieve_tfidf(self, query_text, top_k=5):
        """
        Retrieve similar cases using TF-IDF and cosine similarity

        Args:
            query_text (str): Query text
            top_k (int): Number of top similar cases to return

        Returns:
            list: List of tuples (case_id, similarity_score, case_info)
        """
        # Transform query to TF-IDF vector
        query_vector = self.tfidf_vectorizer.transform([query_text])

        # Calculate cosine similarity
        similarities = cosine_similarity(query_vector, self.tfidf_matrix).flatten()

        # Get top-k most similar cases
        top_indices = similarities.argsort()[-top_k:][::-1]

        results = []
        for idx in top_indices:
            case_info = {
                'case_id': self.df.iloc[idx]['case_id'],
                'no_perkara': self.df.iloc[idx].get('no_perkara', 'N/A'),
                'tanggal': self.df.iloc[idx].get('tanggal', 'N/A'),
                'jenis_perkara': self.df.iloc[idx]['jenis_perkara'],
                'pasal': self.df.iloc[idx]['pasal'],
                'pihak': self.df.iloc[idx].get('pihak', 'N/A'),
                'ringkasan_fakta': self.df.iloc[idx]['ringkasan_fakta'],
                'argumen_hukum': self.df.iloc[idx].get('argumen_hukum', 'N/A'),
                'length_kata': self.df.iloc[idx].get('length_kata', 'N/A'),
                'jumlah_pasal': self.df.iloc[idx].get('jumlah_pasal', 'N/A'),
                'mengandung_pidana': self.df.iloc[idx].get('mengandung_pidana', 'N/A'),
                'mengandung_penganiayaan': self.df.iloc[idx].get('mengandung_penganiayaan', 'N/A'),
                'similarity_score': float(similarities[idx])
            }
            results.append((self.df.iloc[idx]['case_id'], similarities[idx], case_info))

        return results

    def retrieve_bert(self, query_text, top_k=5):
        """
        Retrieve similar cases using BERT embeddings

        Args:
            query_text (str): Query text
            top_k (int): Number of top similar cases to return

        Returns:
            list: List of tuples (case_id, similarity_score, case_info)
        """
        # Get BERT embedding for query
        query_text = str(query_text)[:512]
        inputs = self.bert_tokenizer(
            query_text,
            return_tensors='pt',
            truncation=True,
            padding=True,
            max_length=512
        )

        with torch.no_grad():
            outputs = self.bert_model(**inputs)
            query_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()

        # Calculate cosine similarity with all case embeddings
        similarities = cosine_similarity([query_embedding], self.bert_embeddings).flatten()

        # Get top-k most similar cases
        top_indices = similarities.argsort()[-top_k:][::-1]

        results = []
        for idx in top_indices:
            case_info = {
                'case_id': self.df.iloc[idx]['case_id'],
                'no_perkara': self.df.iloc[idx].get('no_perkara', 'N/A'),
                'tanggal': self.df.iloc[idx].get('tanggal', 'N/A'),
                'jenis_perkara': self.df.iloc[idx]['jenis_perkara'],
                'pasal': self.df.iloc[idx]['pasal'],
                'pihak': self.df.iloc[idx].get('pihak', 'N/A'),
                'ringkasan_fakta': self.df.iloc[idx]['ringkasan_fakta'],
                'argumen_hukum': self.df.iloc[idx].get('argumen_hukum', 'N/A'),
                'length_kata': self.df.iloc[idx].get('length_kata', 'N/A'),
                'jumlah_pasal': self.df.iloc[idx].get('jumlah_pasal', 'N/A'),
                'mengandung_pidana': self.df.iloc[idx].get('mengandung_pidana', 'N/A'),
                'mengandung_penganiayaan': self.df.iloc[idx].get('mengandung_penganiayaan', 'N/A'),
                'similarity_score': float(similarities[idx])
            }
            results.append((self.df.iloc[idx]['case_id'], similarities[idx], case_info))

        return results

    def retrieve(self, query_text, method='tfidf', top_k=5):
        """
        Main retrieval function

        Args:
            query_text (str): Query text
            method (str): 'tfidf' or 'bert'
            top_k (int): Number of top similar cases to return

        Returns:
            list: List of retrieved cases
        """
        if method.lower() == 'tfidf':
            return self.retrieve_tfidf(query_text, top_k)
        elif method.lower() == 'bert':
            return self.retrieve_bert(query_text, top_k)
        else:
            raise ValueError("method must be 'tfidf' or 'bert'")

    def create_test_queries(self, num_queries=10):
        """
        Create test queries from existing cases

        Args:
            num_queries (int): Number of test queries to create

        Returns:
            list: List of test queries with ground truth
        """
        print(f"Creating {num_queries} test queries...")

        # Ensure we don't sample more queries than we have cases
        actual_num_queries = min(num_queries, len(self.df))

        # Sample random cases for queries
        sample_indices = np.random.choice(len(self.df), actual_num_queries, replace=False)

        queries = []
        for idx in sample_indices:
            case = self.df.iloc[idx]

            # Create query from ringkasan_fakta or first part of text
            if pd.notna(case['ringkasan_fakta']) and str(case['ringkasan_fakta']).strip():
                query_text = str(case['ringkasan_fakta'])
            elif pd.notna(case.get('argumen_hukum')) and str(case.get('argumen_hukum', '')).strip():
                query_text = str(case['argumen_hukum'])[:200]
            else:
                query_text = str(case['retrieval_text'])[:200]

            query = {
                'query_id': f'query_{len(queries)+1:03d}',
                'query_text': query_text.strip(),
                'ground_truth_case_id': str(case['case_id']),
                'expected_jenis_perkara': str(case['jenis_perkara']) if pd.notna(case['jenis_perkara']) else 'unknown',
                'expected_pasal': str(case['pasal']) if pd.notna(case['pasal']) else 'unknown',
                'expected_no_perkara': str(case.get('no_perkara', 'unknown')),
                'source_index': int(idx)
            }
            queries.append(query)

        print(f"Created {len(queries)} test queries")
        return queries

    def evaluate_retrieval(self, queries, method='tfidf', top_k=5):
        """
        Evaluate retrieval performance

        Args:
            queries (list): List of test queries
            method (str): Retrieval method
            top_k (int): Number of top results to consider

        Returns:
            dict: Evaluation metrics
        """
        print(f"Evaluating retrieval with {method} method...")

        hits_at_1 = 0
        hits_at_k = 0
        mrr_scores = []

        for query in queries:
            results = self.retrieve(query['query_text'], method=method, top_k=top_k)

            # Check if ground truth is in top-1
            if results[0][0] == query['ground_truth_case_id']:
                hits_at_1 += 1

            # Check if ground truth is in top-k
            retrieved_ids = [result[0] for result in results]
            if query['ground_truth_case_id'] in retrieved_ids:
                hits_at_k += 1
                # Calculate reciprocal rank
                rank = retrieved_ids.index(query['ground_truth_case_id']) + 1
                mrr_scores.append(1.0 / rank)
            else:
                mrr_scores.append(0.0)

        metrics = {
            'hits_at_1': hits_at_1 / len(queries),
            f'hits_at_{top_k}': hits_at_k / len(queries),
            'mrr': np.mean(mrr_scores),
            'total_queries': len(queries)
        }

        return metrics

def main():
    """Main execution function"""
    # Initialize system
    dataset_path = '/content/drive/MyDrive/SEMESTER 6/Penalaran Komputer/UAS_Penalaran Komputer/data/processed/cases.csv'
    retrieval_system = CaseRetrievalSystem(dataset_path)

    try:
        # Load and preprocess data
        df = retrieval_system.load_data()
        print(f"Data shape: {df.shape}")
        print(f"Columns: {list(df.columns)}")

        # Show sample of jenis_perkara distribution
        print(f"\nJenis Perkara distribution:")
        perkara_counts = df['jenis_perkara'].value_counts()
        print(perkara_counts.head(10))

        # Show additional statistics for new columns
        if 'mengandung_pidana' in df.columns:
            print(f"\nMengandung Pidana distribution:")
            print(df['mengandung_pidana'].value_counts())

        if 'mengandung_penganiayaan' in df.columns:
            print(f"\nMengandung Penganiayaan distribution:")
            print(df['mengandung_penganiayaan'].value_counts())

        # Create representations
        print("\n=== Creating TF-IDF Representation ===")
        tfidf_matrix = retrieval_system.create_tfidf_representation()

        print("\n=== Creating BERT Representation ===")
        bert_embeddings = retrieval_system.create_bert_representation()

        # Split data
        print("\n=== Splitting Data ===")
        X_train, X_test, y_train, y_test = retrieval_system.split_data(test_size=0.3)

        # Train ML model
        print("\n=== Training ML Model ===")
        ml_model = retrieval_system.train_ml_model(model_type='svm')

        # Create test queries
        print("\n=== Creating Test Queries ===")
        test_queries = retrieval_system.create_test_queries(num_queries=min(10, len(df)))

        # Create directory and save test queries
        eval_dir = '/content/drive/MyDrive/SEMESTER 6/Penalaran Komputer/UAS_Penalaran Komputer/data/eval'
        os.makedirs(eval_dir, exist_ok=True)

        queries_file = os.path.join(eval_dir, 'queries.json')
        with open(queries_file, 'w', encoding='utf-8') as f:
            # Convert any remaining numpy types to native Python types
            serializable_queries = []
            for query in test_queries:
                serializable_query = {}
                for key, value in query.items():
                    if isinstance(value, (np.integer, np.int64, np.int32)):
                        serializable_query[key] = int(value)
                    elif isinstance(value, (np.floating, np.float64, np.float32)):
                        serializable_query[key] = float(value)
                    elif isinstance(value, np.ndarray):
                        serializable_query[key] = value.tolist()
                    else:
                        serializable_query[key] = value
                serializable_queries.append(serializable_query)

            json.dump(serializable_queries, f, ensure_ascii=False, indent=2)

        print(f"Test queries saved to {queries_file}")

        # Evaluate retrieval
        if len(test_queries) > 0:
            print("\n=== Evaluating TF-IDF Retrieval ===")
            tfidf_metrics = retrieval_system.evaluate_retrieval(test_queries, method='tfidf', top_k=5)
            print(f"TF-IDF Metrics: {tfidf_metrics}")

            print("\n=== Evaluating BERT Retrieval ===")
            bert_metrics = retrieval_system.evaluate_retrieval(test_queries, method='bert', top_k=5)
            print(f"BERT Metrics: {bert_metrics}")

        # Test retrieval function with sample queries
        print("\n=== Testing Retrieval Function ===")
        sample_queries = [
            "kasus perceraian dengan harta gono gini",
            "sengketa kontrak kerja",
            "gugatan wanprestasi pembayaran",
            "pembatalan perjanjian jual beli"
        ]

        for sample_query in sample_queries[:2]:  # Test first 2 queries
            print(f"\n" + "="*50)
            print(f"Query: {sample_query}")
            print("="*50)

            try:
                print("\nTF-IDF Results:")
                tfidf_results = retrieval_system.retrieve(sample_query, method='tfidf', top_k=3)
                for i, (case_id, score, info) in enumerate(tfidf_results, 1):
                    print(f"{i}. Case ID: {case_id}, Score: {score:.4f}")
                    print(f"   No Perkara: {info['no_perkara']}")
                    print(f"   Tanggal: {info['tanggal']}")
                    print(f"   Jenis Perkara: {info['jenis_perkara']}")
                    print(f"   Pasal: {info['pasal']}")
                    print(f"   Mengandung Pidana: {info['mengandung_pidana']}")
                    print(f"   Mengandung Penganiayaan: {info['mengandung_penganiayaan']}")
                    print(f"   Ringkasan: {str(info['ringkasan_fakta'])[:100]}...")
                    print()
            except Exception as e:
                print(f"Error in TF-IDF retrieval: {e}")

            try:
                print("BERT Results:")
                bert_results = retrieval_system.retrieve(sample_query, method='bert', top_k=3)
                for i, (case_id, score, info) in enumerate(bert_results, 1):
                    print(f"{i}. Case ID: {case_id}, Score: {score:.4f}")
                    print(f"   No Perkara: {info['no_perkara']}")
                    print(f"   Tanggal: {info['tanggal']}")
                    print(f"   Jenis Perkara: {info['jenis_perkara']}")
                    print(f"   Pasal: {info['pasal']}")
                    print(f"   Mengandung Pidana: {info['mengandung_pidana']}")
                    print(f"   Mengandung Penganiayaan: {info['mengandung_penganiayaan']}")
                    print(f"   Ringkasan: {str(info['ringkasan_fakta'])[:100]}...")
                    print()
            except Exception as e:
                print(f"Error in BERT retrieval: {e}")

        print("\n" + "="*60)
        print("=== Case Retrieval System Setup Complete ===")
        print("="*60)

        # Save system summary
        summary = {
            "dataset_info": {
                "total_cases": len(df),
                "tfidf_features": tfidf_matrix.shape[1],
                "bert_embedding_dim": bert_embeddings.shape[1],
                "unique_jenis_perkara": len(df['jenis_perkara'].unique()),
                "columns": list(df.columns)
            },
            "split_info": {
                "train_size": len(X_train),
                "test_size": len(X_test),
                "test_ratio": len(X_test) / (len(X_train) + len(X_test))
            },
            "model_info": {
                "ml_model_trained": ml_model is not None,
                "model_type": type(ml_model).__name__ if ml_model else None
            },
            "evaluation": {
                "num_test_queries": len(test_queries),
                "tfidf_metrics": tfidf_metrics if 'tfidf_metrics' in locals() else None,
                "bert_metrics": bert_metrics if 'bert_metrics' in locals() else None
            }
        }

        summary_file = os.path.join(eval_dir, 'system_summary.json')
        with open(summary_file, 'w', encoding='utf-8') as f:
            json.dump(summary, f, ensure_ascii=False, indent=2, default=str)

        print(f"System summary saved to {summary_file}")

        return retrieval_system

    except Exception as e:
        print(f"Error in main execution: {e}")
        import traceback
        traceback.print_exc()
        return None

if __name__ == "__main__":
    # Run the main function
    system = main()

Loading dataset...
Dataset loaded: 31 cases
Data preprocessing completed
Available columns: ['case_id', 'no_perkara', 'tanggal', 'jenis_perkara', 'pasal', 'pihak', 'ringkasan_fakta', 'argumen_hukum', 'length_kata', 'jumlah_pasal', 'mengandung_pidana', 'mengandung_penganiayaan', 'text_full', 'combined_text', 'retrieval_text']
Data shape: (31, 15)
Columns: ['case_id', 'no_perkara', 'tanggal', 'jenis_perkara', 'pasal', 'pihak', 'ringkasan_fakta', 'argumen_hukum', 'length_kata', 'jumlah_pasal', 'mengandung_pidana', 'mengandung_penganiayaan', 'text_full', 'combined_text', 'retrieval_text']

Jenis Perkara distribution:
jenis_perkara
Pidana Umum \n Pidana Umum  Penganiayaan    24
Perdata Agama                                3
Pidana Umum                                  2
Pidana Militer                               1
Pidana Umum  Penganiayaan                    1
Name: count, dtype: int64

Mengandung Pidana distribution:
mengandung_pidana
True     27
False     4
Name: count, dtype: int64

Me