In [33]:
!pip install -q torch torchvision
!pip install -q langchain langchain-text-splitters sentence-transformers
!pip install -q pandas numpy chromadb
!pip install -q transformers
!pip install -q azure-ai-inference
!pip install -q ragas

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

In [44]:
import torch
import chromadb
import json
import re
import os 
import shutil
import time
import google.generativeai as genai
import ipywidgets as widgets
import pandas as pd

from pydantic import Field
from typing import Any, List, Mapping, Optional
from ragas.metrics import faithfulness, context_precision, answer_relevancy, context_recall
from ragas import evaluate
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper
from datasets import Dataset
from azure.ai.inference import ChatCompletionsClient
from azure.ai.inference.models import SystemMessage, UserMessage
from azure.core.credentials import AzureKeyCredential
from IPython.display import display
from transformers import AutoTokenizer
from kaggle_secrets import UserSecretsClient
from chromadb.utils import embedding_functions
from typing import List, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_text_splitters import MarkdownHeaderTextSplitter
from langchain_core.language_models.llms import LLM
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain.embeddings import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer

In [54]:
# moving chroma_db into a writeable spaces
src = "/kaggle/input/tax-dataset-indonesia/sdsn_chromadb2"
dst = "/kaggle/working/sdsn_chroma_db"
if not os.path.exists(dst):
    shutil.copytree(src, dst)

In [55]:
user_secrets = UserSecretsClient()
github_key = user_secrets.get_secret("GITHUB_KEY")

In [56]:
e5_tokenizer = AutoTokenizer.from_pretrained("intfloat/multilingual-e5-large")

In [57]:
embedding_model = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")
wrapped_embeddings = LangchainEmbeddingsWrapper(embedding_model)

In [58]:
def count_tokens(text, tokenizer=e5_tokenizer):
    return len(tokenizer.encode(text))

In [59]:
def clean_context(text):
    text = re.sub(r'^\|[-\s|]+\|$', '', text, flags=re.MULTILINE)
    text = re.sub(r'!\[.*?\]\(.*?\)', '', text)
    text = re.sub(r'\$.*?\$', '', text)
    lines = [line for line in text.split('\n') if len(line.strip()) > 20]
    return '\n'.join(lines)

In [60]:
## llm wrappper
class LLMWrapper(LLM):
    model_name: str = Field(default="openai/gpt-4.1-mini", description="Model name to use")
    api_key: str = Field(description="GitHub API token")
    endpoint: str = Field(default="https://models.github.ai/inference")
    request_delay: float = Field(default=3.0)
    temperature: float = Field(default=0.2)
    top_p: float = Field(default=0.9)
    max_tokens: int = Field(default=1024)

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._client = None
        self._last_request_time = 0

    @property
    def client(self):
        if self._client is None:
            self._client = ChatCompletionsClient(
                endpoint=self.endpoint,
                credential=AzureKeyCredential(self.api_key),
            )
        return self._client

    @property
    def _llm_type(self) -> str:
        return "gpt4.1mini_github"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager=None,
        **kwargs: Any,
    ) -> str:
        now = time.time()
        if now - self._last_request_time < self.request_delay:
            time.sleep(self.request_delay - (now - self._last_request_time))
        self._last_request_time = time.time()

        try:
            response = self.client.complete(
                messages=[
                    SystemMessage("You are a tax law expert assistant."),
                    UserMessage(prompt),
                ],
                temperature=self.temperature,
                top_p=self.top_p,
                model=self.model_name,
                max_tokens=self.max_tokens
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error: {str(e)}"

    @property
    def _identifying_params(self):
        return {"model_name": self.model_name, "endpoint": self.endpoint}


In [61]:
class LoadTaxDocumentVectorDB:
    def __init__(self, collection_name: str = "tax_documents", persist_directory: str = "/kaggle/working/sdsn_chroma_db"):
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.collection_name = collection_name
        try:
            self.collection = self.client.get_collection(collection_name)
        except Exception as e:
            raise RuntimeError(f"Collection '{collection_name}' not found: {e}")

    def get_all_documents(self):
        return self.collection.get(include=["documents", "metadatas", "ids"])

    def query(self, query_embedding, n_results=5):
        return self.collection.query(query_embeddings=[query_embedding], n_results=n_results)

In [62]:
class TaxQueryEnhancer:
    def __init__(self):
        self.synonyms = {
            r"\bpph\s?21\b": "pajak penghasilan pasal 21",
            r"\bpph\s?22\b": "pajak penghasilan pasal 22",
            r"\bpph\s?23\b": "pajak penghasilan pasal 23",
            r"\bpph\b": "pajak penghasilan",
            r"\bppn\b": "pajak pertambahan nilai",
            r"\bpbb\b": "pajak bumi dan bangunan",
            r"\bnpwp\b": "nomor pokok wajib pajak",
            r"\bspt\b": "surat pemberitahuan",
            r"\bpkp\b": "pengusaha kena pajak",
            r"\bwp\b": "wajib pajak",
            r"\bbea materai\b|\bmaterai\b": "bea meterai",
            r"\bskp\b": "surat ketetapan pajak",
            r"\bskpkb\b": "surat ketetapan pajak kurang bayar",
            r"\bskplb\b": "surat ketetapan pajak lebih bayar",
            r"\bskpn\b": "surat ketetapan pajak nihil",
            r"\bsurat tagihan\b": "surat tagihan pajak",
            r"\bsurat paksa\b": "surat paksa",
            r"\bdenda\b": "sanksi administrasi",
            r"\bsanksi\b": "sanksi administrasi",
            r"\bbanding\b": "putusan banding",
            r"\bkeberatan\b": "surat keputusan keberatan",
            r"\bself assessment\b": "sistem self assessment"
        }
        self.typo_corrections = {
            "pajak penghaslan": "pajak penghasilan",
            "pajak penghasillan": "pajak penghasilan",
            "pajak pertambahan nilau": "pajak pertambahan nilai",
            "npw": "npwp"
        }

    def enhance(self, query: str) -> str:
        query = query.lower().strip()
        query = re.sub(r'\s+', ' ', query)
        for typo, correction in self.typo_corrections.items():
            query = re.sub(r'\b' + re.escape(typo) + r'\b', correction, query)
        for pattern, replacement in self.synonyms.items():
            query = re.sub(pattern, replacement, query)
        query = re.sub(r'\s+', ' ', query).strip()
        return query


In [63]:
class TaxQueryProcessor:
    def __init__(self, embedding_model):
        self.embedding_model = embedding_model
        self.tax_query_enhancer = TaxQueryEnhancer()
    
    def process_query(self, user_query: str) -> dict:
        cleaned_query = self._clean_query(user_query)
        
        enhanced_query = self.tax_query_enhancer.enhance(cleaned_query)

        query_embedding = self.embedding_model.encode([f"query: {enhanced_query}"])[0]
        
        return {
            'original_query': user_query,
            'processed_query': enhanced_query,
            'query_embedding': query_embedding,
            'query_type': self._classify_query_type(enhanced_query)
        }
    
    def _clean_query(self, query: str) -> str:
        query = re.sub(r'[^\w\s\-\(\)]', ' ', query)
        return query.strip()
    
    def _classify_query_type(self, query: str) -> str:
        if any(word in query.lower() for word in ['tarif', 'rate', 'berapa']):
            return 'tariff_inquiry'
        elif any(word in query.lower() for word in ['cara', 'bagaimana', 'prosedur']):
            return 'procedure_inquiry'
        elif any(word in query.lower() for word in ['sanksi', 'denda', 'penalty']):
            return 'penalty_inquiry'
        else:
            return 'general_inquiry'


In [64]:
class TaxRetrievalEngine:
    def __init__(self, vector_db: LoadTaxDocumentVectorDB, tokenizer, max_context_tokens=6000):
        self.vector_db = vector_db
        self.reranker = TaxReranker()
        self.tokenizer = tokenizer
        self.max_context_tokens = max_context_tokens

    
    def retrieve_documents(self, processed_query: dict, k: int = 10) -> list:
        initial_results = self.vector_db.query(
            processed_query['query_embedding'], 
            n_results=k*2 
        )

        docs = initial_results['documents'][0]
        metadatas = initial_results['metadatas'][0]
        distances = initial_results['distances'][0]

        results = [
            {'content': doc, 'metadata': meta, 'score': 1 - dist}
            for doc, meta, dist in zip(docs, metadatas, distances)
        ]
        
        reranked_results = self.reranker.rerank(
            results, 
            processed_query['query_type']
        )

        total_tokens = 0
        selected_chunks = []
        for result in reranked_results:
            chunk_tokens = count_tokens(result['content'], self.tokenizer)
            if total_tokens + chunk_tokens > self.max_context_tokens:
                break
            selected_chunks.append(result)
            total_tokens += chunk_tokens
            
        return self._format_results(selected_chunks)
        
    def _format_results(self, results: list) -> list:
        formatted_results = []
        for result in results:
            formatted_results.append({
                'content': result['content'],
                'metadata': result['metadata'],
                'relevance_score': result['score'],
            })
        return formatted_results

class TaxReranker:
    def __init__(self):
        self.query_type_weights = {
            'tariff_inquiry': {'pasal': 1.2, 'ayat': 1.1, 'tabel': 1.5},
            'procedure_inquiry': {'pasal': 1.3, 'ayat': 1.2, 'contoh': 1.4},
            'penalty_inquiry': {'sanksi': 1.5, 'denda': 1.4, 'pelanggaran': 1.3}
        }
    
    def rerank(self, results: list, query_type: str) -> list:
        weights = self.query_type_weights.get(query_type, {})
        
        for result in results:
            base_score = result['score']
            boost_factor = 1.0
            
            metadata = result.get('metadata', {})
            for key, weight in weights.items():
                if key in str(metadata).lower():
                    boost_factor *= weight
            
            result['final_score'] = base_score * boost_factor
        
        return sorted(results, key=lambda x: x['final_score'], reverse=True)


In [65]:
class TaxResponseGenerator:
    def __init__(self, llm_model_name: str = "deepseek/DeepSeek-R1-0528"):
        self.endpoint = "https://models.github.ai/inference"
        self.model = llm_model_name
        self.token = github_key
        self.client = ChatCompletionsClient(
            endpoint=self.endpoint,
            credential=AzureKeyCredential(self.token),
        )
        self.prompt_templates = TaxPromptTemplates()
    
    def generate_response(self, query_data: dict, retrieved_docs: list) -> dict:
        context = self._assemble_context(retrieved_docs, tokenizer=e5_tokenizer, max_tokens=5600)
        
        prompt = self.prompt_templates.get_template(query_data["query_type"]).format(
            query=query_data["original_query"],
            context=context,
            instructions=self._get_instructions(query_data["query_type"]),
        )

        try:
            response = self.client.complete(
                messages=[
                    SystemMessage(content="Anda adalah asisten pajak yang menjawab berdasarkan dokumen yang diberikan. Jika jawaban tidak ada pada dokumen maka beritau."),
                    UserMessage(content=prompt),
                ],
                model=self.model,
                temperature=0.2,
                top_p=0.9,
                max_tokens=1024
            )
            answer_text = response.choices[0].message.content
        except Exception as e:
            print(f"Error: {e}")
            answer_text = "Maaf, terjadi error dalam memproses permintaan."
        return {"answer": answer_text}

    def _assemble_context(self, retrieved_docs, tokenizer, max_tokens):
        context_parts = []
        total_tokens = 0
        
        for i, doc in enumerate(retrieved_docs, 1):
            cleaned = clean_context(doc['content'])
            part = f"Dokumen {i}:\n{cleaned}\n"
            part_tokens = count_tokens(part, tokenizer)
            if total_tokens + part_tokens > max_tokens:
                break
            context_parts.append(part)
            total_tokens += part_tokens
        return "\n---\n".join(context_parts)
        
    def _get_instructions(self, query_type):
        mapping = {
            "tariff_inquiry": "Berikan angka tarif dan dasar hukumnya.",
            "procedure_inquiry": "Jelaskan langkah-langkah prosedur secara urut.",
            "penalty_inquiry": "Sebutkan jenis sanksi dan pasal rujukan.",
            "general_inquiry": "Jawab ringkas namun komprehensif.",
        }
        return mapping.get(query_type, mapping["general_inquiry"])

class TaxPromptTemplates:
    def __init__(self):
        self.templates = {
                'tariff_inquiry': """
            Berdasarkan konteks peraturan perpajakan berikut:
            
            {context}
            
            Pertanyaan: {query}
            
            Instruksi: {instructions}
            
            Jawab pertanyaan dengan mengacu pada dokumen yang diberikan. Berikan informasi tarif pajak yang spesifik, jelas, dan akurat. Jika ada tabel tarif, jelaskan dengan detail.
            
            Jawaban:""",
                        
                        'procedure_inquiry': """
            Berdasarkan konteks peraturan perpajakan berikut:
            
            {context}
            
            Pertanyaan: {query}
            
            Instruksi: {instructions}
            
            Jelaskan prosedur dengan langkah-langkah yang jelas dan mudah dipahami. Rujuk pasal dan ayat yang relevan.
            
            Jawaban:""",
                        
                        'general_inquiry': """
            Sebagai asisten perpajakan yang ahli, jawab pertanyaan berikut berdasarkan konteks peraturan yang diberikan:
            
            Konteks: {context}
            
            Pertanyaan: {query}
            
            Instruksi: {instructions}
            
            Berikan jawaban yang akurat, lengkap, dan mudah dipahami. Selalu rujuk ke pasal dan ayat yang relevan.
            
            Jawaban:"""
        }
    
    def get_template(self, query_type: str) -> str:
        return self.templates.get(query_type, self.templates['general_inquiry'])

In [66]:
class TaxRAGSystem:
    def __init__(self):
        self.embedding_model = SentenceTransformer('intfloat/multilingual-e5-large', device='cuda')
        self.vector_db = LoadTaxDocumentVectorDB()
        self.query_processor = TaxQueryProcessor(self.embedding_model)
        self.retrieval_engine = TaxRetrievalEngine(self.vector_db, e5_tokenizer)
        self.response_generator = TaxResponseGenerator()
        github_llm = LLMWrapper(
            api_key=github_key,
            request_delay=7
        )
        self.ragas_llm = LangchainLLMWrapper(github_llm)
        
    def answer_query(self, user_query: str) -> dict:
        processed_query = self.query_processor.process_query(user_query)
        
        retrieved_docs = self.retrieval_engine.retrieve_documents(processed_query)
        
        response_data = self.response_generator.generate_response(
            processed_query, retrieved_docs
        )
        
        return response_data, retrieved_docs

    def evaluate_batch(self, questions, ground_truths, delay_per_question=7):
        eval_data = {
            "question": [],
            "contexts": [],
            "answer": [],
            "ground_truth": []
        }

        all_answers = []
        
        for idx, (q, gt) in enumerate(zip(questions, ground_truths)):
            if idx > 0:
                print(f"Menunggu {delay_per_question} detik sebelum pertanyaan berikutnya...")
                time.sleep(delay_per_question)
            answer_dict, contexts = self.answer_query(q)
            eval_data["question"].append(q)
            eval_data["contexts"].append([c['content'] for c in contexts])
            eval_data["answer"].append(answer_dict["answer"])
            eval_data["ground_truth"].append(gt)
            all_answers.append(answer_dict["answer"])
        eval_dataset = Dataset.from_dict(eval_data)

        result = evaluate(
            eval_dataset,
            metrics=[context_recall],
            llm=self.ragas_llm, 
            embeddings=wrapped_embeddings,
            raise_exceptions=False
        )
        df_eval = result.to_pandas()
        df_eval["llm_answer"] = all_answers
        return df_eval

In [None]:
questions = [
    "Apa definisi Pajak menurut Undang-Undang Ketentuan Umum dan Tata Cara Perpajakan?",
    "Apa yang dimaksud dengan Nomor Pokok Wajib Pajak (NPWP)?",
    "Siapa yang termasuk sebagai Wajib Pajak?",
]
ground_truths = [
    "Pajak adalah kontribusi wajib kepada negara yang terutang oleh orang pribadi atau badan yang bersifat memaksa berdasarkan Undang-Undang, dengan tidak mendapatkan imbalan secara langsung dan digunakan untuk keperluan negara bagi sebesar-besarnya kemakmuran rakyat.",
    "Nomor Pokok Wajib Pajak adalah nomor yang diberikan kepada Wajib Pajak sebagai sarana dalam administrasi perpajakan yang dipergunakan sebagai tanda pengenal diri atau identitas Wajib Pajak dalam melaksanakan hak dan kewajiban perpajakannya.",
    "Wajib Pajak adalah orang pribadi atau badan, meliputi pembayar pajak, pemotong pajak, dan pemungut pajak, yang mempunyai hak dan kewajiban perpajakan sesuai dengan ketentuan peraturan perundang-undangan perpajakan.",
]

tax_rag = TaxRAGSystem()
df_result = tax_rag.evaluate_batch(questions, ground_truths)
print(df_result)

In [84]:
pd.DataFrame(df_result).drop('context_precision', axis=1)

Unnamed: 0,user_input,retrieved_contexts,response,reference,context_recall,llm_answer
0,Apa definisi Pajak menurut Undang-Undang Keten...,[. Pajak adalah kontribusi wajib kepada negara...,"<think>\nHmm, user meminta definisi pajak berd...",Pajak adalah kontribusi wajib kepada negara ya...,1.0,"<think>\nHmm, user meminta definisi pajak berd..."
1,Apa yang dimaksud dengan Nomor Pokok Wajib Paj...,"[. Oleh karena itu, kepada setiap wajib pajak ...","<think>\nHmm, user bertanya tentang definisi N...",Nomor Pokok Wajib Pajak adalah nomor yang dibe...,1.0,"<think>\nHmm, user bertanya tentang definisi N..."
2,Siapa yang termasuk sebagai Wajib Pajak?,[. Pajak adalah kontribusi wajib kepada negara...,"<think>\nHmm, user bertanya tentang siapa yang...","Wajib Pajak adalah orang pribadi atau badan, m...",1.0,"<think>\nHmm, user bertanya tentang siapa yang..."


In [87]:
print(f'Pertanyaan: {questions[0]}')
print("")
print(df_result['llm_answer'][0])

Pertanyaan: Apa definisi Pajak menurut Undang-Undang Ketentuan Umum dan Tata Cara Perpajakan?

<think>
Hmm, user meminta definisi pajak berdasarkan UU KUP beserta prosedur penjelasannya. Aku harus memastikan jawabanku akurat dan sesuai dokumen yang diberikan.

Pertama, aku ingat dokumen 1 jelas menyebutkan definisi pajak di bagian pembuka. Kutipan persisnya: "Pajak adalah kontribusi wajib... bagi sebesar-besarnya kemakmuran raya." Ini pasti dasar utamanya. 

Aku perhatikan user juga minta prosedur langkah demi langkah. Meski definisi pajak bersifat konseptual bukan prosedural, aku akan struktur jawaban secara sistematis: (1) sebut sumber dokumen, (2) kutip definisi lengkap, (3) jelaskan komponen krusialnya. 

Dari 20 dokumen, hanya dokumen 1 yang memuat definisi formal. Dokumen lain membahas aspek teknis seperti NPWP atau penagihan, jadi tak relevan di sini. Aku perlu tegas bahwa ini satu-satunya definisi resmi dalam kumpulan dokumen.

Untuk komponen definisi, aku break down poin wajib

In [88]:
print(f'Pertanyaan: {questions[1]}')
print("")
print(df_result['llm_answer'][1])

Pertanyaan: Apa yang dimaksud dengan Nomor Pokok Wajib Pajak (NPWP)?

<think>
Hmm, user bertanya tentang definisi NPWP berdasarkan dokumen perpajakan yang diberikan. Aku perlu mencari definisi resmi dari dokumen-dokumen tersebut.

Dari dokumen 2 ayat (6) jelas disebutkan NPWP adalah nomor yang diberikan sebagai sarana administrasi perpajakan yang berfungsi sebagai identitas wajib pajak. Ini definisi intinya. 

Dokumen 4 dan 5 juga menguatkan bahwa NPWP merupakan tanda pengenal diri dalam pelaksanaan hak dan kewajiban perpajakan. 

Yang menarik, dokumen 13 menyebut pengecualian untuk orang dengan penghasilan di bawah PTKP tidak wajib memiliki NPWP. Tapi ini bukan bagian dari definisi, jadi tak perlu dimasukkan dalam jawaban utama.

Dokumen 1 dan 7 menjelaskan fungsi tambahan NPWP untuk menjaga ketertiban pembayaran pajak dan sebagai syarat administratif dalam dokumen perpajakan. 

Aku perhatikan dokumen 8 tentang sanksi bagi yang tidak punya NPWP, tapi itu di luar pertanyaan user. Fokus

In [89]:
print(f'Pertanyaan: {questions[2]}')
print("")
print(df_result['llm_answer'][2])

Pertanyaan: Siapa yang termasuk sebagai Wajib Pajak?

<think>
Hmm, user bertanya tentang siapa yang termasuk sebagai Wajib Pajak. Pertanyaan ini cukup mendasar dalam perpajakan. Mari aku telusuri dokumen yang tersedia.

Dari Dokumen 1 Ayat (2) definisi dasar Wajib Pajak adalah orang pribadi atau badan yang punya hak dan kewajiban perpajakan. Dokumen 2 memberikan penjelasan lebih rinci tentang kapan subjek pajak menjadi Wajib Pajak: untuk OP dalam negeri ketika penghasilannya melebihi PTKP, badan dalam negeri sejak berdiri, dan WPLN otomatis menjadi WP karena punya penghasilan dari Indonesia.

Aku juga menemukan penjelasan penting di Dokumen 12 tentang konsep "memenuhi persyaratan subjektif dan objektif". Ini diperkuat oleh Dokumen 4 yang menjelaskan persyaratan subjektif terkait status subjek pajak dan objektif terkait penerimaan penghasilan.

Untuk kasus khusus, Dokumen 14 memberi contoh ketika Direktur Jenderal Pajak bisa menetapkan WP, seperti pada sengketa kepemilikan atau pengguna

In [67]:
tax_rag = TaxRAGSystem()
button = widgets.Button(description="Tanya")
output = widgets.Output()

def on_button_clicked(b):
    with output:
        output.clear_output()
        res, ret = tax_rag.answer_query(query_box.value)
        print("=== Jawaban Assistant ===")
        print(res['answer'])
        # for i, doc in enumerate(ret, 1):
        #     print(f"\Document {i}:")
        #     print(doc['content'][:500])
        #     print("-" * 40)
query_box = widgets.Text(
    value='',
    placeholder='Tulis pertanyaan Anda di sini...',
    description='Pertanyaan:',
    disabled=False
)
display(query_box)

button.on_click(on_button_clicked)
display(button, output)

Text(value='', description='Pertanyaan:', placeholder='Tulis pertanyaan Anda di sini...')

Button(description='Tanya', style=ButtonStyle())

Output()