# Initialise

In [1]:
# Install Pathway and heavy dependencies
# We use 'udiff' to enable the computation engine
%pip install pathway -q
%pip install sentence-transformers -q
%pip install openai -q
%pip install python-dotenv -q

# Step By Step Cells

In [2]:
!git clone https://github.com/AbitathaRoy/Kharagpur-Data-Science-Hackathon-2026.git
# Then adjust your paths like: CHUNKS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/chunks.jsonl"

fatal: destination path 'Kharagpur-Data-Science-Hackathon-2026' already exists and is not an empty directory.


In [3]:
import os
import pathway as pw
from dotenv import load_dotenv

os.environ["HF_HUB_DISABLE_IMPLICIT_TOKEN"] = "1" # Stops the HF Token warning

# Load from environment or .env file
from dotenv import load_dotenv
load_dotenv()


# Configuration
CHUNKS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/chunks.jsonl"
CLAIMS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/claims_output.json"
OUTPUT_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/final_predictions_pathway.jsonl"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"

# Detection Prompt (The "Detective" Persona)
JUDGE_PROMPT = """
You are a Detective verifying a witness statement against the official case files.

WITNESS CLAIM: "{claim}"

OFFICIAL CASE FILES (NOVEL TEXT):
{context}

INSTRUCTIONS:
1. Search the Case Files for the specific events/characters mentioned in the Claim.
2. **Compare Details:** Look closely at names, dates, causes of death, and relationships.
3. **Verdict Logic:**
   - **CONTRADICT**: If the text explicitly tells a *different* story (e.g., Claim says "Shot", Text says "Stabbed").
   - **CONTRADICT**: If the Claim says X happened, but the Text says Y happened *instead*.
   - **CONSISTENT**: If the Claim is supported by the text.
   - **CONSISTENT**: If the Claim adds extra details that do *not* conflict with the text (Silence is not a lie).

Return JSON: {{ "verdict": "consistent" | "contradict", "reason": "Citing specific passage [x]" }}
"""

In [2]:
import json
from openai import OpenAI

@pw.udf
def call_judge_model(claim: str, context: str) -> str:
    """
    Calls Groq/OpenAI to judge the consistency.
    """
    # Initialize client inside the UDF for pickling reasons in distributed mode
    client = OpenAI(
        base_url="https://api.groq.com/openai/v1",
        api_key=os.environ.get("GROQ_API_KEY")
    )
    
    formatted_prompt = JUDGE_PROMPT.format(claim=claim, context=context)
    
    try:
        response = client.chat.completions.create(
            model="llama-3.3-70b-versatile", # Or "qwen/qwen3-32b"
            messages=[
                {"role": "system", "content": "You are a ruthless logic engine. Output strict JSON."},
                {"role": "user", "content": formatted_prompt}
            ],
            temperature=0.0,
            response_format={"type": "json_object"}
        )
        return response.choices[0].message.content
    except Exception as e:
        return json.dumps({"verdict": "error", "reason": str(e)})

@pw.udf
def parse_verdict(llm_response: str) -> str:
    try:
        data = json.loads(llm_response)
        return data.get("verdict", "consistent")
    except:
        return "consistent"

@pw.udf
def parse_reason(llm_response: str) -> str:
    try:
        data = json.loads(llm_response)
        return data.get("reason", "Parse Error")
    except:
        return "Parse Error"

In [3]:
from sentence_transformers import SentenceTransformer

# 1. Ingest Data
# We read the JSONL files into Pathway Tables
documents = pw.io.jsonlines.read(
    CHUNKS_FILE,
    schema=pw.schema_from_dict({
        "chunk_text": str,
        "metadata": dict  # We need metadata.book_name
    }),
    mode="static" # Static mode for batch processing
)

claims = pw.io.jsonlines.read(
    CLAIMS_FILE,
    schema=pw.schema_from_dict({
        "id": str,
        "input_text": str,
        "metadata": dict # We need metadata.book_name here too
    }),
    mode="static"
)

# 2. Embedding UDF
# We load the model once globally (or use a service) to avoid reloading per row
embedder = SentenceTransformer(EMBEDDING_MODEL)

@pw.udf
def get_embedding(text: str) -> list[float]:
    return embedder.encode(text).tolist()

# 3. Vectorization
# We compute embeddings for both Documents and Claims
docs_with_vectors = documents.with_columns(
    vector=get_embedding(pw.this.chunk_text),
    book_name=pw.this.metadata["book_name"]
)

claims_with_vectors = claims.with_columns(
    vector=get_embedding(pw.this.input_text),
    book_name=pw.this.metadata["book_name"]
)

# 4. SCOPED RAG (The Critical Fix)
# We join Claims with Docs using KNN, BUT we enforce a hard filter on book_name.
# We retrieve TOP 30 chunks to ensure we don't miss the "needle".
retrieved_context = pw.ml.index.knn_query(
    docs_with_vectors,
    claims_with_vectors,
    k=30, # High context depth
    vector_col=pw.this.vector,
    filter=pw.this.book_name == pw.arg.book_name # <--- THE MAGIC FILTER
)

# 5. Format Context
# Collapse the 30 retrieved chunks into a single string
@pw.udf
def format_context_list(texts: list[str]) -> str:
    combined = ""
    for i, text in enumerate(texts):
        combined += f"[{i+1}] {text}\n\n"
    return combined

enriched_claims = claims_with_vectors.with_columns(
    context=format_context_list(retrieved_context.chunk_text)
)

# 6. Final Judgment
# Pass the Claim + Scoped Context to the LLM
final_results = enriched_claims.with_columns(
    llm_raw=call_judge_model(pw.this.input_text, pw.this.context)
).with_columns(
    verdict=parse_verdict(pw.this.llm_raw),
    reason=parse_reason(pw.this.llm_raw)
)

# Select clean columns for output
output_table = final_results.select(
    pw.this.id,
    pw.this.input_text,
    pw.this.verdict,
    pw.this.reason,
    pw.this.metadata
)

Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).


ValueError: Can't use 'id' as a column name

Execute

In [4]:
# Write to JSONL
pw.io.jsonlines.write(output_table, OUTPUT_FILE)

# Run the pipeline
print("üöÄ Starting Pathway Pipeline... (This handles ingestion, indexing, and judging)")
pw.run()
print("‚úÖ Pipeline Finished. Check", OUTPUT_FILE)

NameError: name 'output_table' is not defined

# All-In-One Cell

In [2]:
!git clone https://github.com/AbitathaRoy/Kharagpur-Data-Science-Hackathon-2026.git
# Then adjust your paths like: CHUNKS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/chunks.jsonl"

fatal: destination path 'Kharagpur-Data-Science-Hackathon-2026' already exists and is not an empty directory.


In [3]:
import os
import json
import pathway as pw
from sentence_transformers import SentenceTransformer
from openai import OpenAI

# --- 1. HARDCODED CONFIGURATION ---
# # Load from environment or .env file
from dotenv import load_dotenv
load_dotenv()

# Removed hardcoded environment variables

# Paths
SOURCE_CLAIMS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/claims_output.json"
CLEANED_CLAIMS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/claims_cleaned.jsonl"
CHUNKS_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/chunks.jsonl"
OUTPUT_FILE = "Kharagpur-Data-Science-Hackathon-2026/data/final_predictions_pathway.jsonl"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"

print("üßπ Pre-processing: Preparing Claims File...")
if not os.path.exists(SOURCE_CLAIMS_FILE):
    print(f"‚ùå Error: {SOURCE_CLAIMS_FILE} not found.")
else:
    with open(SOURCE_CLAIMS_FILE, 'r', encoding='utf-8') as f:
        raw_data = json.load(f)

    with open(CLEANED_CLAIMS_FILE, 'w', encoding='utf-8') as f:
        for item in raw_data:
            if 'id' in item:
                item['claim_id'] = item.pop('id')
            f.write(json.dumps(item) + "\n")
    print(f"‚úÖ Data prepared: {CLEANED_CLAIMS_FILE}")

# --- 2. DEFINE PIPELINE UDFs ---

print(f"üì• Loading Embedding Model ({EMBEDDING_MODEL})...")
embedder = SentenceTransformer(EMBEDDING_MODEL)

@pw.udf
def get_embedding(text: str) -> list[float]:
    if not text: 
        return [0.0] * 384
    return embedder.encode(text).tolist()

@pw.udf
def clean_book_name(raw_name: str) -> str:
    """Normalizes 'data/novels/Book Name.txt' -> 'Book Name'"""
    if not raw_name: 
        return ""
    name = os.path.basename(raw_name)
    return os.path.splitext(name)[0]

@pw.udf
def extract_book_name_from_json(metadata: dict) -> str:
    """Extract book_name from metadata dict and return as string"""
    if metadata and "book_name" in metadata:
        return str(metadata["book_name"])
    return ""

@pw.udf
def cosine_similarity(vec1: list[float], vec2: list[float]) -> float:
    """Calculate cosine similarity between two vectors"""
    import math
    dot_product = sum(a * b for a, b in zip(vec1, vec2))
    norm1 = math.sqrt(sum(a * a for a in vec1))
    norm2 = math.sqrt(sum(b * b for b in vec2))
    if norm1 == 0 or norm2 == 0:
        return 0.0
    return dot_product / (norm1 * norm2)

JUDGE_PROMPT_TEMPLATE = """
You are a Detective verifying a witness statement against the official case files.

WITNESS CLAIM: "{claim}"

OFFICIAL CASE FILES (NOVEL TEXT):
{context}

INSTRUCTIONS:
1. Search the Case Files for the specific events/characters mentioned in the Claim.
2. **Compare Details:** Look closely at names, dates, causes of death, and relationships.
3. **Verdict Logic:**
   - **CONTRADICT**: If the text explicitly tells a *different* story.
   - **CONTRADICT**: If the Claim says X happened, but the Text says Y happened *instead*.
   - **CONSISTENT**: If the Claim is supported by the text.
   - **CONSISTENT**: If the Claim adds extra details that do *not* conflict with the text (Silence != Lie).

Return JSON: {{ "verdict": "consistent" | "contradict", "reason": "Citing specific passage [x]" }}
"""

@pw.udf
def call_judge_model(claim: str, context: str) -> str:
    api_key = os.environ.get("GROQ_API_KEY")
    if not api_key: 
        return json.dumps({"verdict": "error", "reason": "API Key Missing"})

    client = OpenAI(base_url="https://api.groq.com/openai/v1", api_key=api_key)
    
    if not context or len(context) < 10:
        return json.dumps({"verdict": "consistent", "reason": "No context found."})

    try:
        response = client.chat.completions.create(
            model="qwen/qwq-32b-preview",
            messages=[
                {"role": "system", "content": "You are a logic engine. Output strict JSON."},
                {"role": "user", "content": JUDGE_PROMPT_TEMPLATE.format(claim=claim, context=context)}
            ],
            temperature=0.0,
            response_format={"type": "json_object"}
        )
        return response.choices[0].message.content
    except Exception as e:
        return json.dumps({"verdict": "error", "reason": str(e)})

@pw.udf
def parse_verdict(llm_response: str) -> str:
    try: 
        return json.loads(llm_response).get("verdict", "consistent")
    except: 
        return "consistent"

@pw.udf
def parse_reason(llm_response: str) -> str:
    try: 
        return json.loads(llm_response).get("reason", "Parse Error")
    except: 
        return "Parse Error"

@pw.udf
def prepare_search_query(text: str, claims_dict: dict) -> str:
    events = ""
    if claims_dict and "events" in claims_dict:
        for e in claims_dict["events"]:
            events += " " + e.get("description", "")
    return (text + events).strip()

# --- 3. BUILD PIPELINE ---

print("üìÇ Ingesting Data...")

# Read chunks with correct schema
documents = pw.io.jsonlines.read(
    CHUNKS_FILE,
    schema=pw.schema_from_dict({
        "chunk_text": str,
        "book_name": str,
        "diff": int,
        "time": int
    }),
    mode="static"
)

# Read claims
claims = pw.io.jsonlines.read(
    CLEANED_CLAIMS_FILE,
    schema=pw.schema_from_dict({
        "claim_id": str,
        "input_text": str,
        "metadata": dict,
        "claims": dict
    }),
    mode="static"
)

print("üßÆ Vectorizing & Normalizing...")
docs_with_vectors = documents.with_columns(
    vector=get_embedding(pw.this.chunk_text),
    clean_book_name=clean_book_name(pw.this.book_name)
)

# FIX: Extract book_name from metadata dict and convert to string
claims_with_vectors = claims.with_columns(
    search_text=prepare_search_query(pw.this.input_text, pw.this.claims),
    book_name_str=extract_book_name_from_json(pw.this.metadata)
).with_columns(
    vector=get_embedding(pw.this.search_text)
)

print("üîç Performing Book-Scoped Similarity Search...")

# Step 1: Join on book_name to filter documents (both are now strings)
joined = claims_with_vectors.join(
    docs_with_vectors,
    claims_with_vectors.book_name_str == docs_with_vectors.clean_book_name,
    id=claims_with_vectors.id
).select(
    claim_id=claims_with_vectors.claim_id,
    input_text=claims_with_vectors.input_text,
    metadata=claims_with_vectors.metadata,
    book_name_str=claims_with_vectors.book_name_str,
    claim_vector=claims_with_vectors.vector,
    chunk_text=docs_with_vectors.chunk_text,
    chunk_vector=docs_with_vectors.vector
)

# Step 2: Calculate similarity scores
joined_with_scores = joined.with_columns(
    similarity=cosine_similarity(pw.this.claim_vector, pw.this.chunk_vector)
)

# Step 3: Group by claim and collect chunks with scores
grouped = joined_with_scores.groupby(
    pw.this.claim_id,
    pw.this.input_text,
    pw.this.metadata,
    pw.this.book_name_str
).reduce(
    claim_id=pw.this.claim_id,
    input_text=pw.this.input_text,
    metadata=pw.this.metadata,
    book_name_str=pw.this.book_name_str,
    # Collect all chunks with their similarity scores
    all_chunks=pw.reducers.tuple(pw.this.chunk_text),
    all_scores=pw.reducers.tuple(pw.this.similarity)
)

# Step 4: Sort and format top chunks in Python
@pw.udf
def get_top_chunks_formatted(chunks: tuple, scores: tuple) -> str:
    """Sort chunks by score and format top 15"""
    if not chunks or not scores:
        return ""
    
    # Combine chunks with scores
    combined = list(zip(chunks, scores))
    
    # Sort by score (descending)
    combined.sort(key=lambda x: x[1], reverse=True)
    
    # Format top 15
    result = ""
    for i, (chunk_text, score) in enumerate(combined[:15]):
        result += f"[{i+1}] {chunk_text}\n\n"
    
    return result

print("‚öñÔ∏è  Judging...")
final_results = grouped.with_columns(
    context=get_top_chunks_formatted(pw.this.all_chunks, pw.this.all_scores)
).with_columns(
    llm_raw=call_judge_model(pw.this.input_text, pw.this.context)
).with_columns(
    verdict=parse_verdict(pw.this.llm_raw),
    reason=parse_reason(pw.this.llm_raw)
)

output_table = final_results.select(
    claim_id=pw.this.claim_id,
    input_text=pw.this.input_text,
    verdict=pw.this.verdict,
    reason=pw.this.reason,
    metadata=pw.this.metadata
)

# --- 4. EXECUTE ---
print("üèÉ Starting Pipeline Execution...")
pw.io.jsonlines.write(output_table, OUTPUT_FILE)
pw.run()

# --- 5. POST-PROCESSING ---
print("üîÑ Post-processing: Renaming 'claim_id' -> 'id'...")
temp_output = []
if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                data = json.loads(line)
                if 'claim_id' in data:
                    data['id'] = data.pop('claim_id')
                temp_output.append(data)
            except: 
                pass

    with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
        for item in temp_output:
            f.write(json.dumps(item) + "\n")
    print(f"‚úÖ Finished. Results saved to {OUTPUT_FILE}")
else:
    print("‚ùå Error: Output file not created. Check logs for errors.")

üßπ Pre-processing: Preparing Claims File...
‚úÖ Data prepared: Kharagpur-Data-Science-Hackathon-2026/data/claims_cleaned.jsonl
üì• Loading Embedding Model (all-MiniLM-L6-v2)...
üìÇ Ingesting Data...
üßÆ Vectorizing & Normalizing...
üîç Performing Book-Scoped Similarity Search...
‚öñÔ∏è  Judging...


Output()

üèÉ Starting Pipeline Execution...


    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(




üîÑ Post-processing: Renaming 'claim_id' -> 'id'...
‚úÖ Finished. Results saved to Kharagpur-Data-Science-Hackathon-2026/data/final_predictions_pathway.jsonl


In [1]:
!pip show pathway

Name: pathway
Version: 0.28.0
Summary: Pathway is a data processing framework which takes care of streaming data updates for you.
Home-page: https://pathway.com/
Author: 
Author-email: 
License: 
Location: /usr/local/lib/python3.12/dist-packages
Requires: aiohttp, aiohttp-cors, async-lru, beartype, boto3, click, deltalake, diskcache, fs, geopy, gitpython, google-api-python-client, google-cloud-bigquery, google-cloud-pubsub, h3, jmespath, jupyter-bokeh, networkx, numpy, opentelemetry-api, opentelemetry-exporter-otlp-proto-grpc, opentelemetry-sdk, pandas, panel, pyarrow, pydantic, python-sat, requests, rich, scikit-learn, shapely, typing-extensions
Required-by: 
