In [None]:
import pandas as pd
import ast

import nltk
from tqdm import tqdm
import re
import math
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer  # You can keep PorterStemmer for English parts of text

!pip install -q sastrawi
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory # For Indonesian Stemmer
from Sastrawi.StopWordRemover.StopWordRemoverFactory import StopWordRemoverFactory
nltk.download('punkt_tab', quiet=True)

!pip install rank_bm25

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/209.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━[0m [32m163.8/209.7 kB[0m [31m4.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m209.7/209.7 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting rank_bm25
  Downloading rank_bm25-0.2.2-py3-none-any.whl.metadata (3.2 kB)
Downloading rank_bm25-0.2.2-py3-none-any.whl (8.6 kB)
Installing collected packages: rank_bm25
Successfully installed rank_bm25-0.2.2


In [None]:
d_df = pd.read_csv("/content/drive/MyDrive/SKRIPSI/lb_v1_5474.csv", index_col=0)
q_df = pd.read_csv("/content/drive/MyDrive/SKRIPSI/main_v1_1118.csv")

In [None]:

class BM25:
    def __init__(self, documents, k1=1.2, b=0.75):
        self.documents = documents
        self.k1 = k1
        self.b = b
        self.N = len(documents)
        self.stemmer_factory = StemmerFactory() #Sastrawi Stemmer Factory
        self.stemmer_id = self.stemmer_factory.create_stemmer() #Create Sastrawi Stemmer
        self.stop_word_remover_factory = StopWordRemoverFactory() #Sastrawi Stopword Factory
        self.stop_words_id = self.stop_word_remover_factory.create_stop_word_remover() #Create Sastrawi Stopwords
        self.avgdl = sum(len(self._tokenize(d)) for d in documents) / self.N
        self._initialize_idfs()
        self.stemmer_en = PorterStemmer()  # Keep PorterStemmer if you still need it

    def _len_docs(self):
        return len(self.documents)

    def _tokenize(self, text):
        # 1. Case folding:
        text = text.lower()

        # 2. Special character cleaning (more comprehensive):
        text = re.sub(r'[^\w\s]', '', text)

        # 3. Tokenization:
        tokens = word_tokenize(text)

        # 4. Stop word removal (Indonesian):
        tokens = [w for w in tokens if not self.stop_words_id.remove(w)] #Use Sastrawi Stopwords

        # 5. Stemming (Indonesian):
        stemmed_tokens = [self.stemmer_id.stem(token) for token in tokens] #Use Sastrawi Stemmer
        return stemmed_tokens

    def _initialize_idfs(self):
        self.idfs = {}
        for document in tqdm(self.documents):
            tokens = self._tokenize(document)
            for term in set(tokens):
                if term not in self.idfs:
                    n_qi = sum(1 for d in self.documents if term in self._tokenize(d))
                    self.idfs[term] = math.log((self.N - n_qi + 0.5) / (n_qi + 0.5)) if n_qi > 0 else 0.0

    def get_scores(self, query):
        scores = []
        tokenized_query = self._tokenize(query)  # Tokenize the query
        doc_id_text = {i+1: d for i, d in enumerate(self.documents)}
        for i in doc_id_text:
            tokens = self._tokenize(doc_id_text[i]) # Tokenize the document
            score = 0
            for term in tokenized_query:
                if term in self.idfs:
                    tf = tokens.count(term)
                    idf = self.idfs[term]
                    score += idf * (tf / (self.k1 + tf)) * ((1 - self.b) + self.b * (len(tokens) / self.avgdl))
            scores.append({"doc_id": i, "score": score})
        return scores

    def get_ranked_documents(self, query):
        scores = self.get_scores(query)
        # Sort by the 'score' key within each dictionary in scores
        ranked_documents = sorted(scores, key=lambda x: x['score'], reverse=True)
        return ranked_documents

In [None]:
import json
import os
import signal
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from typing import Dict, List, Any, Tuple
import numpy as np

class Miner:
    """
    Optimized mining class with checkpointing capabilities and progress tracking

    Attributes:
        q_data: dictionary of q_id: q_text
        q_rel: dictionary of q_id: [doc_ids]
        d_data: dictionary of doc_id: doc_text
        checkpoint_file: path to save checkpoints
        checkpoint_frequency: save progress every N steps
        batch_size: number of queries to process in parallel
        cache_size: size of LRU cache for document ranking
    """
    def __init__(
        self,
        q_data: Dict[str, str],
        q_rel: Dict[str, List[str]],
        d_data: Dict[str, str],
        checkpoint_file: str = "mining_checkpoint.json",
        checkpoint_frequency: int = 100,
        batch_size: int = 4,
        cache_size: int = 1000
    ):
        self.q_data = q_data
        self.q_rel = q_rel
        self.d_data = d_data
        self.checkpoint_file = checkpoint_file
        self.checkpoint_frequency = checkpoint_frequency
        self.batch_size = batch_size
        self.cache_size = cache_size

        # Pre-initialize containers for better performance
        self.output: List[Dict[str, str]] = []
        self.interrupted = False
        self.last_processed_qid = None

        # Convert document data to numpy arrays for faster processing
        self.doc_ids = np.array(list(self.d_data.keys()))
        self.doc_texts = np.array(list(self.d_data.values()))

        # Initialize thread pool
        self.executor = ThreadPoolExecutor(max_workers=self.batch_size)

        # Register signal handlers
        signal.signal(signal.SIGINT, self._handle_interrupt)
        signal.signal(signal.SIGTERM, self._handle_interrupt)

        # Load previous checkpoint if exists
        self._load_checkpoint()

        # Initialize ranker once
        print("Initializing global ranker...")
        docs_tuple = tuple(self.d_data.values())
        self.global_ranker = self._initiate_ranker(docs_tuple)
        print("Global ranker initialized successfully")

    def _handle_interrupt(self, signum, frame):
        """Handle interrupt signals by setting flag and saving checkpoint"""
        print("\nInterrupt received. Saving checkpoint and stopping gracefully...")
        self.interrupted = True
        self._save_checkpoint()
        self.executor.shutdown(wait=True)

    def _save_checkpoint(self):
        """Save current progress to checkpoint file using numpy for faster serialization"""
        checkpoint_data = {
            'output': self.output,
            'last_processed_qid': self.last_processed_qid
        }
        np.save(self.checkpoint_file, checkpoint_data)
        print(f"\nCheckpoint saved to {self.checkpoint_file}")

    def _load_checkpoint(self):
        """Load progress from checkpoint file if it exists"""
        checkpoint_path = f"{self.checkpoint_file}.npy"
        if os.path.exists(checkpoint_path):
            try:
                checkpoint_data = np.load(checkpoint_path, allow_pickle=True).item()
                self.output = checkpoint_data.get('output', [])
                self.last_processed_qid = checkpoint_data.get('last_processed_qid')
                print(f"Loaded checkpoint with {len(self.output)} processed items")
            except Exception as e:
                print(f"Error loading checkpoint: {e}")
                self.output = []
                self.last_processed_qid = None

    @lru_cache(maxsize=1000)
    def _initiate_ranker(self, docs: Tuple[str, ...]) -> Any:
        """Cached ranker initialization with hashable input"""
        return BM25(list(docs))

    def _process_query(self, qid: str, pos_pbar: tqdm = None) -> List[Dict[str, str]]:
        """Process a single query and return mining results"""
        results = []
        pos_ids = self.q_rel[qid]

        # Get scores using vectorized operations
        rel_docs = self.global_ranker.get_ranked_documents(self.q_data[qid])
        neg_candidates = np.array([doc["doc_id"] for doc in rel_docs if doc["doc_id"] not in pos_ids])

        if len(neg_candidates) < 5:
            if pos_pbar:
                pos_pbar.update(len(pos_ids))
            return results

        # Get top 5 candidates efficiently
        top_candidates = neg_candidates[:5]
        candidates_text = self.doc_texts[np.isin(self.doc_ids, top_candidates)]
        neg_ranker = self._initiate_ranker(tuple(candidates_text))

        # Process positives in parallel
        futures = []
        for pos in pos_ids:
            futures.append(
                self.executor.submit(
                    self._process_positive,
                    pos=pos,
                    pos_text=self.d_data[pos],
                    neg_ranker=neg_ranker,
                    neg_candidates=top_candidates,
                    qid=qid
                )
            )

        # Collect results
        for future in futures:
            result = future.result()
            if result:
                results.append(result)
            if pos_pbar:
                pos_pbar.update(1)

        return results

    def _process_positive(self, pos: str, pos_text: str, neg_ranker: Any,
                         neg_candidates: np.ndarray, qid: str) -> Dict[str, str]:
        """Process a single positive example"""
        ranked_negs = neg_ranker.get_ranked_documents(pos_text)
        best_candidate = ranked_negs[0]["doc_id"]
        real_neg_id = neg_candidates[best_candidate]
        return {"qid": qid, "pos": pos, "neg": real_neg_id}

    def mine(self) -> List[Dict[str, str]]:
        """
        Mine data with improved performance through batching and parallel processing
        Returns:
            List of dictionaries containing mining results
        """
        # Get list of qids to process, skipping already processed ones
        qids_to_process = list(self.q_rel.keys())
        if self.last_processed_qid and self.last_processed_qid in qids_to_process:
            start_idx = qids_to_process.index(self.last_processed_qid) + 1
            qids_to_process = qids_to_process[start_idx:]

        # Calculate total positives for progress tracking
        total_positives = sum(len(self.q_rel[qid]) for qid in qids_to_process)

        # Main progress bar for queries
        main_pbar = tqdm(
            total=len(qids_to_process),
            desc="Processing queries",
            position=0
        )

        # Progress bar for positive examples
        pos_pbar = tqdm(
            total=total_positives,
            desc="Processing positives",
            position=1,
            leave=True
        )

        # Process queries in batches
        for i in range(0, len(qids_to_process), self.batch_size):
            if self.interrupted:
                break

            batch_qids = qids_to_process[i:i + self.batch_size]

            # Process batch in parallel
            futures = [
                self.executor.submit(self._process_query, qid, pos_pbar)
                for qid in batch_qids
            ]

            # Collect results
            for future in as_completed(futures):
                self.output.extend(future.result())
                main_pbar.update(1)

            # Save checkpoint periodically
            if (i + self.batch_size) % self.checkpoint_frequency == 0:
                self.last_processed_qid = batch_qids[-1]
                self._save_checkpoint()

        # Close progress bars
        main_pbar.close()
        pos_pbar.close()

        # Final checkpoint and cleanup
        if not self.interrupted:
            self.last_processed_qid = qids_to_process[-1] if qids_to_process else None
            self._save_checkpoint()
            # Clean up checkpoint file if processing completed successfully
            checkpoint_path = f"{self.checkpoint_file}.npy"
            if os.path.exists(checkpoint_path):
                os.remove(checkpoint_path)

        # Cleanup
        self.executor.shutdown(wait=True)

        return self.output

In [None]:
def get_rel_docs(qid):
    raw_docs = q_df.loc[qid, "context"]
    docs = ast.literal_eval(raw_docs)
    output = []
    for doc in docs:
        output.append(doc)
    return output

q_data = {qid: q_df.loc[qid, "question"] for qid in q_df.index}
q_rel = {qid: get_rel_docs(qid) for qid in q_df.index}
d_data = {doc_id: d_df.loc[doc_id, "fulltext"] for doc_id in d_df.index}

ranker = Miner(
    q_data=q_data,
    q_rel=q_rel,
    d_data=d_data,
    checkpoint_file="/content/drive/MyDrive/SKRIPSI/mining_checkpoint_cached.json",  # optional
    checkpoint_frequency=25,  # optional
    batch_size=4,
    cache_size=1_000_000
)
q_pos_neg = ranker.mine()

Initializing global ranker...


100%|██████████| 5474/5474 [06:28<00:00, 14.10it/s] 


Global ranker initialized successfully


Processing queries:   0%|          | 0/1118 [00:00<?, ?it/s]
Processing positives:   0%|          | 0/7037 [00:00<?, ?it/s][A

  0%|          | 0/5 [00:00<?, ?it/s][A[A



  0%|          | 0/5 [00:00<?, ?it/s][A[A[A[A


  0%|          | 0/5 [00:00<?, ?it/s][A[A[A




  0%|          | 0/5 [00:00<?, ?it/s][A[A[A[A[A



 20%|██        | 1/5 [00:51<03:26, 51.67s/it][A[A[A[A


 20%|██        | 1/5 [00:52<03:28, 52.05s/it][A[A[A

 20%|██        | 1/5 [00:52<03:29, 52.26s/it][A[A




 20%|██        | 1/5 [00:51<03:27, 51.82s/it][A[A[A[A[A



 40%|████      | 2/5 [00:58<01:15, 25.32s/it][A[A[A[A



 60%|██████    | 3/5 [00:58<00:27, 13.85s/it][A[A[A[A



 80%|████████  | 4/5 [00:59<00:08,  8.48s/it][A[A[A[A



100%|██████████| 5/5 [00:59<00:00, 11.88s/it]


 40%|████      | 2/5 [00:59<01:17, 25.76s/it][A[A


 40%|████      | 2/5 [00:59<01:17, 25.69s/it][A[A[A

 60%|██████    | 3/5 [00:59<00:28, 14.07s/it][A[A


 60%|██████    | 3/5 [00:59<00:28, 

In [None]:
q_pos_neg_df = pd.DataFrame(q_pos_neg)
q_pos_neg_df.to_csv("/content/drive/MyDrive/SKRIPSI/q_pos_neg.csv")