In [1]:
import os
import re
import ast
import pandas as pd
import numpy as np
import fitz
import faiss
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch


  from .autonotebook import tqdm as notebook_tqdm


### Data Loading & Preprocessing

In [2]:
# Loading and preprocessing the card data
df = pd.read_csv("../data/cards.csv")
df = df[~df["type"].isin(["Token", "Skill Card"])]

df_meta = df.drop(columns=['id','image_url', 'image_url_small', 'ban_tcg', 'ban_ocg', 'ban_goat',
                            'staple', 'views', 'viewsweek', 'upvotes', 'downvotes', 'formats', 'treated_as', 
                            'tcg_date', 'ocg_date', 'konami_id', 'has_effect'])
#df_meta.head(5)

In [3]:
# Loading test data
test_df = pd.read_csv("../data/test_dataset.csv")
#test_df.head(5)

In [None]:
### This Chunk was only run once to convert the PDF file into text ###
# Extracting text from rulebook pdf 
#def pdf_to_text(pdf_path):
#    doc = fitz.open(pdf_path)
#    text = ""
#    for page in doc:
#        text += page.get_text()
#    return text

#rules = pdf_to_text("../data/SD_RuleBook_EN_10.pdf")

# Saving the text as a .txt file for cleaning
#with open('../data/Rules.txt', 'a', encoding='utf-8') as file:
#    file.writelines(rules)

In [4]:
# Loading the manually cleaned .txt file
with open('../data/Manually_Cleaned_Rules.txt', 'r') as file:
   clean_rules = file.read()

### Chunking

In [None]:
def split_sections(text):
    """
    Splits a structured text document into (header, body) section pairs.

    The function uses a regular expression to capture each header and then pairs it
    with the text that follows until the next header. The result is a list
    of (header, body) tuples in the order they appear in the document.

    Parameters
    ----------
    text : str
        The full text to be segmented into sections.

    Returns
    -------
    list of tuple
        A list where each element is a (header, body) pair:
        - header : str
            The extracted section header, e.g. "3. Experimental Setup".
        - body : str
            The text belonging to that section, stripped of surrounding
            whitespace.
    """
    pattern = r"\n{3}(\d{1,2}\.\s[^\n]+)\n{2}"
    parts = re.split(pattern, text)
    sections = []
    for i in range(1, len(parts), 2):
        header = parts[i].strip()
        body = parts[i+1].strip()
        sections.append((header, body))
    return sections


def chunk_text(text, max_words=300, overlap=50):
    """
    Splits a long text into overlapping word-based chunks.

    This function divides the input text into sequential chunks, each
    containing up to `max_words` words. Consecutive chunks overlap by
    `overlap` words to preserve context across boundaries. The final chunk
    may contain fewer than `max_words` words if the text ends before the
    limit is reached.

    Parameters
    ----------
    text : str
        The full text to be segmented.
    max_words : int, optional
        Maximum number of words allowed in each chunk. Default is 300.
    overlap : int, optional
        Number of words shared between consecutive chunks. Default is 50.

    Returns
    -------
    list of str
        A list of text chunks, each containing up to `max_words` words,
        with `overlap` words of context preserved between adjacent chunks.
    """
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = min(start + max_words, len(words))
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start += max_words - overlap  # move forward with overlap
    return chunks

def build_rag_chunks(cleaned_text):
    """
    Constructs chunks by utilizing the previous 2 functions.

    This function first splits the input text into sections using
    `split_sections`, then further divides each section body into
    overlapping word-based chunks via `chunk_text`. Each resulting chunk is
    stored in a dictionary containing the section header, a unique
    chunk identifier, and the chunk text itself. 

    Parameters
    ----------
    cleaned_text : str
        The full preprocessed text to be segmented into sections and
        sub-chunks.

    Returns
    -------
    list of dict
        A list of chunk dictionaries with the following keys:
        - "section": str
            The section header from which the chunk was derived.
        - "chunk_id": str
            A unique identifier combining the section header and chunk index.
        - "text": str
            The chunked text content.
    """    
    sections = split_sections(cleaned_text)
    rag_chunks = []
    for header, body in sections:
        subchunks = chunk_text(body, max_words=300, overlap=50)
        for i, chunk in enumerate(subchunks):
            rag_chunks.append({
                "section": header,
                "chunk_id": f"{header}_{i}",
                "text": chunk
            })
    return rag_chunks

# Building chunks
rag_chunks = build_rag_chunks(clean_rules)

### Creating Vector Database

In [6]:
embed_model = SentenceTransformer("all-mpnet-base-v2")

texts = [chunk["text"] for chunk in rag_chunks]
embeddings = embed_model.encode(texts, convert_to_numpy=True)

dim = embeddings.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(embeddings)

metadata = [
    {"section": chunk["section"], "chunk_id": chunk["chunk_id"]}
    for chunk in rag_chunks
]


### Retrieval

In [8]:
model_name = "mistralai/Mistral-7B-Instruct-v0.2"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

`torch_dtype` is deprecated! Use `dtype` instead!
Fetching 3 files: 100%|██████████| 3/3 [00:12<00:00,  4.00s/it]
Loading checkpoint shards: 100%|██████████| 3/3 [00:01<00:00,  1.93it/s]


In [None]:
def parse_ruling(text):
    """
    Parses the model-generated ruling into a structured decision and explanation.

    Parameters
    ----------
    text : str
        The raw ruling text produced by the model.

    Returns
    -------
    tuple[str, str]
        A pair consisting of:
        - decision : {"YES", "NO", "UNKNOWN"}
            The extracted decision label.
        - explanation : str
            The explanation text following the last "Explanation:" marker.
    """
    # Normalize whitespace
    cleaned = " ".join(text.split())

    # Extracts decision (YES/NO)
    decision_match = re.search(r"decision\s*[:\-]\s*(yes|no)", cleaned, re.IGNORECASE)
    decision = decision_match.group(1).upper() if decision_match else "UNKNOWN"

    # Find all occurrences of "Explanation:"
    explanation_positions = [m.start() for m in re.finditer(r"explanation\s*[:\-]", cleaned, re.IGNORECASE)]

    if not explanation_positions:
        return decision, ""

    # Use only the last Explanation:
    last_pos = explanation_positions[-1]

    # Extract everything after the last Explanation:
    explanation = cleaned[last_pos:]
    explanation = re.sub(r"explanation\s*[:\-]\s*", "", explanation, flags=re.IGNORECASE).strip()

    return decision, explanation

In [None]:
def get_card_text(card_names, df_meta):
    """
    Retrieves formatted card descriptions from the metadata dataframe.

    For each card name in `card_names`, this function looks up the
    corresponding row in `df_meta` and extracts the card's description,
    type, and sub-type (race). 

    Parameters
    ----------
    card_names : list of str
        A list of card names to retrieve metadata for.
    df_meta : pandas.DataFrame
        A dataframe containing card metadata, expected to include the
        columns "name", "desc", "type", and "race".

    Returns
    -------
    str
        A formatted multi‑section string where each section contains:
        - the card name
        - its card type
        - its sub-type (race)
        - its descriptive text
    """
    texts = []
    for name in card_names:
        row = df_meta[df_meta["name"] == name]
        if not row.empty:
            desc = row.iloc[0]["desc"]
            ctype = row.iloc[0]["type"]
            race = row.iloc[0]["race"]
            texts.append(
                f"{name}:\n"
                f"  Card Type: {ctype}\n"
                f"  Sub-Type: {race}\n"
                f"  Text: {desc}"
            )
    return "\n\n".join(texts)

def build_retrieval_query(query, game_state, card_text):
    return f"Query: {query}\nGame State: {game_state}\nCard Text: {card_text}"

def retrieve(query, game_state, card_text, embed_model, index, metadata, rag_chunks, k=5):
    """
    Retrieves the top-k most relevant RAG chunks for a given query and game state.

    This function constructs a retrieval query using the user query, the
    current game state, and any relevant card text. The combined query is
    embedded and searched against a vector index. The top-k nearest chunks 
    are returned along with their associated metadata.
    """
    retrieval_query = build_retrieval_query(query, game_state, card_text)
    q_emb = embed_model.encode([retrieval_query], convert_to_numpy=True)
    distances, indices = index.search(q_emb, k)

    results = []
    for idx in indices[0]:
        results.append({
            "section": metadata[idx]["section"],
            "chunk_id": metadata[idx]["chunk_id"],
            "text": rag_chunks[idx]["text"]
        })
    return results

In [11]:
def build_rag_prompt(game_state, query, card_text, retrieved_chunks):
    context = "\n\n---\n\n".join(
        f"[Section: {c['section']}]\n{c['text']}"
        for c in retrieved_chunks
    )

    prompt = f"""
You are acting as a Yu-Gi-Oh! Judge.
Answer the ruling question based only on the game state, card text, context and your internal knowledge.
Give a YES or NO answer and a short explanation.

### Game State
{game_state}

### Query
{query}

### Card Text
{card_text}

### Rulebook Context
{context}

### Ruling Format
Decision: <YES/NO>
Explanation: <short explanation>
"""
    return prompt.strip()

In [None]:
def ask_rag(row, model, tokenizer, df_meta, index, metadata, rag_chunks, embed_model):
    """
    Queries the language model for a Yu-Gi-Oh! ruling and returns a
    structured interpretation of its output.
    """
    cards = row["cards"]

    # Convert string → list
    if isinstance(cards, str):
        try:
            cards = ast.literal_eval(cards)
        except:
            cards = [cards]

    # Ensure list
    if not isinstance(cards, list):
        cards = [cards]

    card_text = get_card_text(cards, df_meta)
    #card_text = get_card_text(row["cards"], df_meta)

    retrieved = retrieve(
        row["query"],
        row["game_state"],
        card_text,
        embed_model,
        index,
        metadata,
        rag_chunks,
        k=5 # Change this for number of chunks retrieved
    )
    

    # Build RAG prompt
    prompt = build_rag_prompt(
        row["game_state"],
        row["query"],
        card_text,
        retrieved
    )

    # Run Mistral
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    output = model.generate(
        **inputs,
        max_new_tokens=200,
        temperature=0.0,
        do_sample=False
    )
    text = tokenizer.decode(output[0], skip_special_tokens=True)

    # Parse ruling
    decision, explanation = parse_ruling(text)

    return {
        "model_decision": decision,
        "model_explanation": explanation,
        "raw_output": text,
        "retrieved_sections": [c["section"] for c in retrieved]
    }

In [None]:
# Generate a Ruling & a Explanation for each scenario
for idx, row in test_df.iterrows():
    result = ask_rag(
        row,
        model,
        tokenizer,
        df_meta,
        index,
        metadata,
        rag_chunks,
        embed_model
    )

    test_df.at[idx, "rag_decision"] = result["model_decision"]
    test_df.at[idx, "rag_explanation"] = result["model_explanation"]
    test_df.at[idx, "rag_raw"] = result["raw_output"]
    test_df.at[idx, "rag_sections"] = str(result["retrieved_sections"])

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:2 for o

In [58]:
# Save the DataFrame one level up, inside results/
test_df.to_csv("../results/yugioh_rulings_rag_5chunks_text.csv", index=False)