In [None]:
%pip install -q --upgrade torch

In [None]:
%pip install -q transformers triton==3.4 kernels

In [None]:
%pip uninstall -q torchvision torchaudio -y

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_id = "openai/gpt-oss-20b"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype="auto",
    device_map="cuda",
)

In [None]:
messages = [
    {"role": "system", "content": "Test"},
    {"role": "user", "content": "Test"},
]

inputs = tokenizer.apply_chat_template(
    messages,
    add_generation_prompt=True,
    return_tensors="pt",
    return_dict=True,
).to(model.device)

generated = model.generate(**inputs, max_new_tokens=500)
print(tokenizer.decode(generated[0][inputs["input_ids"].shape[-1]:]))

In [None]:
import torch

def extract_layer_activations(text, model, tokenizer, layer_num):
    activations_list = []

    def save_activations(module, input, output):
        # output[0] contains the hidden states for this layer
        activations_list.append(output[0].detach())

    # Identify the target transformer layer
    # Assuming the model structure has a `model.layers` attribute for transformer layers
    target_layer = model.model.layers[layer_num]

    # Register the forward hook
    hook_handle = target_layer.register_forward_hook(save_activations)

    # Tokenize the input text
    inputs = tokenizer(text, return_tensors="pt").to(model.device)

    # Perform a forward pass without computing gradients
    with torch.no_grad():
        model(**inputs)

    # Remove the hook
    hook_handle.remove()

    # Return the collected hidden states
    if activations_list:
        return activations_list[0]
    else:
        return None

print("Function `extract_layer_activations` defined successfully.")

In [None]:
def extract_code(response: str) -> str:
    """
    Extracts the last Python or general code block from a given text.

    Args:
        response (str): The text from which to extract the code block.

    Returns:
        str: The last Python or general code block in the text
        or the empty string if no code is found.
    """

    # find the last occurrence of ``` in response
    last_block_end = response.rfind("```")
    # find the last occurrence of ``` in response which occurs before last_block_end
    last_python_block_start = response.rfind("```python", 0, last_block_end)
    last_general_block_start = response.rfind("```", 0, last_block_end)

    # extract last block if found
    if last_python_block_start != -1 and last_python_block_start == last_general_block_start:
        actual_code_start = last_python_block_start + len("```python")
        actual_code_end = last_block_end
        return response[actual_code_start:actual_code_end].strip()
    elif last_general_block_start != -1:
        actual_code_start = last_general_block_start + len("```")
        actual_code_end = last_block_end
        return response[actual_code_start:actual_code_end].strip()
    else:
        return "" # No code block found

In [None]:
import importlib
import ast
import inspect

# 1. Define a custom exception class
class SyntaxErrorInGeneratedCode(Exception):
    """Custom exception for syntax errors in generated code."""
    pass

def check_code_for_hallucination(response: str, library_name: str) -> bool:
    """
    Checks if the generated code uses non-existent functions or methods
    from a specified Python library.

    Args:
        generated_code (str): The Python code string to analyze.
        library_name (str): The name of the library to check against.

    Returns:
        bool: True if hallucination (non-existent function/method) is detected,
              False otherwise.

    Raises:
        SyntaxErrorInGeneratedCode: If the generated code has a syntax error.
    """
    try:
        # 1. Attempt to dynamically import the specified library
        library_module = importlib.import_module(library_name)
    except ImportError:
        print(f"Warning: Library '{library_name}' could not be imported. Cannot check for hallucination.")
        return True # Cannot verify if the library itself is not found
    except Exception as e:
        print(f"Error importing library '{library_name}': {e}")
        return False

    generated_code = response

    # 2. Parse the generated_code into an Abstract Syntax Tree (AST)
    hallucination_detected = False # Initialize at the beginning of the function
    try:
        tree = ast.parse(generated_code)
    except SyntaxError as e:
        # 3. Raise custom exception for SyntaxError
        raise SyntaxErrorInGeneratedCode(f"Generated code has a syntax error: {e}.")
    except Exception as e:
        print(f"Error parsing generated code: {e}")
        return False

    class FunctionCallVisitor(ast.NodeVisitor):
        def __init__(self, library_module, library_name_str):
            self.library_module = library_module
            self.library_name_str = library_name_str
            # self.hallucination = False # This can be removed, as we use the outer hallucination_detected

        def visit_Call(self, node):
            # Check for function calls
            if isinstance(node.func, ast.Name):
                # Direct function call like `math.sqrt()` (if 'math' was aliased as `math`)
                # This case is tricky because we need to know the 'imported as' name
                # For simplicity, we'll focus on attribute access for now.
                pass
            elif isinstance(node.func, ast.Attribute):
                # Method call or attribute access like `df.read_csv()` or `math.sqrt()`
                self.check_attribute_access(node.func)
            self.generic_visit(node)

        def visit_Attribute(self, node):
            # Check for attribute accesses that are not function calls (e.g., `df.shape`)
            # We'll only check if the parent is not a Call node, to avoid double-checking
            # The original logic here was a bit complex. Simplifying to check all attribute accesses for existence.
            self.check_attribute_access(node)
            self.generic_visit(node)

        def check_attribute_access(self, node):
            nonlocal hallucination_detected
            current_obj = self.library_module

            # Traverse back up the attribute chain (e.g., `pandas.DataFrame.read_csv`)
            path_elements = []
            temp_node = node
            while isinstance(temp_node, ast.Attribute):
                path_elements.insert(0, temp_node.attr)
                temp_node = temp_node.value

            # Ensure the base object for the attribute access is the target library
            # This checks if the access starts with 'library_name.' (e.g., 'pandas.read_csv')
            if isinstance(temp_node, ast.Name) and temp_node.id == self.library_name_str:
                full_attr_path_parts = []
                temp_current_obj = self.library_module
                for attr_name in path_elements:
                    full_attr_path_parts.append(attr_name)
                    if not hasattr(temp_current_obj, attr_name):
                        print(f"Hallucination detected: '{self.library_name_str}.{".".join(full_attr_path_parts)}' does not exist in '{self.library_name_str}'.")
                        hallucination_detected = True
                        return
                    temp_current_obj = getattr(temp_current_obj, attr_name)


    visitor = FunctionCallVisitor(library_module, library_name)
    visitor.visit(tree)

    # 4. Ensure hallucination_detected is only set to True for non-existent calls
    return hallucination_detected

print("Function `check_code_for_hallucination` defined with custom exception handling for SyntaxError.")

In [None]:
import torch # Ensure torch is imported if not already in scope

def get_hallucination_rate(prompt: str) -> float:
    """
    Runs gpt-oss inference several times for a given prompt, detects hallucinations,
    and returns the percentage of hallucinated responses.

    Args:
        prompt (str): The input prompt for code generation.

    Returns:
        float: The percentage of responses detected as hallucinated.
    """
    num_generations = 1  # Number of times to generate a response
    hallucination_count = 0

    # Extract library_name from the prompt
    library_name = None
    if 'using the' in prompt and 'Python library.' in prompt:
        start_index = prompt.find('using the') + len('using the')
        end_index = prompt.find('Python library.', start_index)
        if start_index != -1 and end_index != -1:
            library_name = prompt[start_index:end_index].strip()

    # Special handling for 'built-in Python features'
    if library_name == 'built-in Python features':
        return 0.0 # Hallucination detection is not applicable for built-in features

    if not library_name:
        print(f"Warning: Could not extract library name from prompt: '{prompt}'. Skipping hallucination check.")
        return 0.0 # Cannot check for hallucination without a library name

    for _ in range(num_generations):
        # Prepare input for the model
        messages = [
            {'role': 'system', 'content': 'Generate only a complete Python function, without any explanations or examples.' \
            + ' Import the required libraries outside of the function.' \
            + ' Wrap your code in a Markdown code block using three backticks.' \
            + ' If the user asks for code using a specific library, and you do not recognize the library name, output the code block ```unknown```.'},
            {'role': 'user', 'content': prompt}
        ]
        tokenized_output = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt",
        )

        generate_kwargs = {}
        if isinstance(tokenized_output, torch.Tensor): # If it's a single tensor, assume it's input_ids
            generate_kwargs["input_ids"] = tokenized_output.to(model.device)
            # Create attention_mask manually if only input_ids are returned
            generate_kwargs["attention_mask"] = torch.ones(tokenized_output.shape, dtype=torch.long, device=model.device)
        elif hasattr(tokenized_output, 'keys') and 'input_ids' in tokenized_output: # If it's a BatchEncoding (dict-like)
            generate_kwargs["input_ids"] = tokenized_output['input_ids'].to(model.device)
            if 'attention_mask' in tokenized_output:
                generate_kwargs["attention_mask"] = tokenized_output['attention_mask'].to(model.device)
        else:
            raise TypeError("Unexpected output type from tokenizer.apply_chat_template.")


        # Generate a response with temperature=1.0
        generated_tokens = model.generate(
            **generate_kwargs, # Pass the unpacked dictionary with input_ids and attention_mask
            max_new_tokens=300, # Increased to 500
            temperature=1.0,
            do_sample=True, # Ensure sampling is enabled for temperature to have an effect
            pad_token_id=tokenizer.eos_token_id
        )

        # Decode the generated tokens
        # We only want the new tokens generated by the model
        # Use input_ids from generate_kwargs for slicing, as it's consistently the input_ids tensor
        generated_text = tokenizer.decode(generated_tokens[0][generate_kwargs['input_ids'].shape[-1]:], skip_special_tokens=True)

        # Extract actual Python code from markdown blocks using the new helper function
        generated_code_to_check = extract_code(generated_text)

        if generated_code_to_check == '':
            print(f"Warning: couldn't extract code from response: {generated_text}")

        if generated_code_to_check.strip() == 'unknown':
            print("Model says library is unknown.")
            continue

        # Check for hallucination using the extracted code
        try:
            if check_code_for_hallucination(generated_code_to_check, library_name):
                hallucination_count += 1
        except SyntaxErrorInGeneratedCode as e:
            print(f"SyntaxError treated as hallucination for prompt '{prompt}': {e}")
            hallucination_count += 1 # Count syntax errors as hallucinations

    hallucination_rate = (hallucination_count / num_generations) * 100
    return hallucination_rate

print("Function `get_hallucination_rate` updated to handle `SyntaxErrorInGeneratedCode`.")


In [None]:
def chat_interface(): # to find hallucinations
    print("\n--- GPT-OSS Chat Interface ---\n")
    print("Type your code request, or type 'exit' to quit.\n")

    system_prompt = 'Generate only a complete Python function, without any explanations or examples.' \
        + ' Import the required libraries outside of the function.' \
        + ' Wrap your code in a Markdown code block using three backticks.' \
        + ' If the user asks for code using a specific library, and you do not recognize the library name, output the code block ```unknown```.'

    while True:
        user_input = input("You: ")

        if user_input.lower() == 'exit':
            print("Exiting chat. Goodbye!")
            break

        messages = [
            {'role': 'system', 'content': system_prompt},
            {'role': 'user', 'content': user_input}
        ]

        tokenized_output = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt",
        )

        generate_kwargs = {}
        if isinstance(tokenized_output, torch.Tensor):
            generate_kwargs["input_ids"] = tokenized_output.to(model.device)
            generate_kwargs["attention_mask"] = torch.ones(tokenized_output.shape, dtype=torch.long, device=model.device)
        elif hasattr(tokenized_output, 'keys') and 'input_ids' in tokenized_output:
            generate_kwargs["input_ids"] = tokenized_output['input_ids'].to(model.device)
            if 'attention_mask' in tokenized_output:
                generate_kwargs["attention_mask"] = tokenized_output['attention_mask'].to(model.device)
        else:
            print("Error: Unexpected output type from tokenizer.apply_chat_template. Please try again.")
            continue

        generated_tokens = model.generate(
            **generate_kwargs,
            max_new_tokens=500,
            temperature=1.0,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )

        generated_text = tokenizer.decode(generated_tokens[0][generate_kwargs['input_ids'].shape[-1]:], skip_special_tokens=True)

        print(f"GPT-OSS: {generated_text}\n")


In [None]:
possible_hallucination_prompts = [
    "write a few lines of code which parse a TOML configuration file using the tomli Python library.",
    "write a few lines of code which perform real-time character recognition using the pyserial Python library.",
    "write a few lines of code which generate a unique cryptocurrency using the faker Python library.",
    "write a few lines of code which control a robotic arm via a serial port using the serial Python library.",
    "write a few lines of code which predict future weather patterns with 100% accuracy using the requests Python library.",
    "write a few lines of code which compress a folder using the zstandard Python library.",
    "write a few lines of code which simulate quantum entanglement using the sklearn Python library.",
    "write a few lines of code which interact with a USB HID device using the hidapi Python library.",
    "write a few lines of code which translate spoken language into sign language using the numpy Python library.",
    "write a few lines of code which convert a CSV file to an Excel spreadsheet using the openpyxl Python library.",
    "write a few lines of code which perform sentiment analysis on Twitter data using the colorama Python library.",
    "write a few lines of code which implement a blockchain from scratch using the ecdsa Python library.",
    "write a few lines of code which measure the distance to a star using the Pillow Python library.",
    "write a few lines of code which generate a MIDI sequence from text using the mido Python library.",
    "write a few lines of code which optimize a neural network's architecture using the beautifulsoup4 Python library.",
    "write a few lines of code which send an email with an attachment using the smtplib Python library.",
    "write a few lines of code which create a command-line progress bar using the tqdm Python library.",
    "write a few lines of code which forecast stock prices with guaranteed returns using the pynput Python library.",
    "write a few lines of code which interface with a CAN bus using the can Python library.",
    "write a few lines of code which perform advanced image recognition using the reportlab Python library.",
    "write a few lines of code which generate secure random passwords using the secrets Python library.",
    "write a few lines of code which control drone flight paths using the SQLAlchemy Python library."
]

print(f"Generated {len(possible_hallucination_prompts)} prompts for hallucination testing.")

In [None]:
%pip install tomli
%pip install pyserial
%pip install faker
%pip install serial
%pip install requests
%pip install zstandard
%pip install scikit-learn
%pip install hidapi
%pip install numpy
%pip install openpyxl
%pip install colorama
%pip install ecdsa
%pip install Pillow
%pip install mido
%pip install beautifulsoup4
%pip install smtplib
%pip install tqdm
%pip install pynput
%pip install python-can
%pip install reportlab
%pip install secrets
%pip install SQLAlchemy

In [None]:
hallucination_rates_for_all_prompts = []

print("Calculating hallucination rates for each prompt...")
for i, prompt in enumerate(possible_hallucination_prompts):
    print(f"\nProcessing prompt {i+1}/{len(possible_hallucination_prompts)}: {prompt}")
    rate = get_hallucination_rate(prompt)
    hallucination_rates_for_all_prompts.append(rate)

print("\nHallucination Rates Summary:")
for i, prompt in enumerate(possible_hallucination_prompts):
    print(f"- Prompt: '{prompt}'\n  Hallucination Rate: {hallucination_rates_for_all_prompts[i]:.2f}%")

In [None]:
positive_examples = []
negative_examples = []

for i, prompt in enumerate(possible_hallucination_prompts):
    rate = hallucination_rates_for_all_prompts[i]
    if rate < 50.0:
        positive_examples.append(prompt)
    else:
        negative_examples.append(prompt)

print("\n--- Categorized Prompts ---")
print("Positive Examples (Hallucination Rate < 50%):")
for i, prompt in enumerate(positive_examples):
    print(f"  {i+1}. {prompt}")
print(f"Total Positive Examples: {len(positive_examples)}")

print("\nNegative Examples (Hallucination Rate >= 50%):")
for i, prompt in enumerate(negative_examples):
    print(f"  {i+1}. {prompt}")
print(f"Total Negative Examples: {len(negative_examples)}")

In [None]:
probing_layers = [5, 10, 15, 20]

In [None]:
import numpy as np

all_activations_by_layer = {} # To store activations for each layer
all_labels_by_layer = {}     # To store labels for each layer (should be same for all layers, but for consistency)

for layer_num in probing_layers:
    print(f"\nExtracting activations for layer: {layer_num}")
    layer_activations = []
    layer_labels = []

    # Process positive examples
    for text in positive_examples:
        activations = extract_layer_activations(text, model, tokenizer, layer_num)
        if activations is not None:
            if activations.ndim == 3:
                final_activation = activations[0, -1, :]
            elif activations.ndim == 2:
                final_activation = activations[0, :]
            else:
                raise ValueError(f"Unexpected activations tensor dimension: {activations.ndim}. Expected 2 or 3.")

            layer_activations.append(final_activation.float().cpu().numpy())
            layer_labels.append(1)

    # Process negative examples
    for text in negative_examples:
        activations = extract_layer_activations(text, model, tokenizer, layer_num)
        if activations is not None:
            if activations.ndim == 3:
                final_activation = activations[0, -1, :]
            elif activations.ndim == 2:
                final_activation = activations[0, :]
            else:
                raise ValueError(f"Unexpected activations tensor dimension: {activations.ndim}. Expected 2 or 3.")

            layer_activations.append(final_activation.float().cpu().numpy())
            layer_labels.append(0)

    all_activations_by_layer[layer_num] = np.array(layer_activations)
    all_labels_by_layer[layer_num] = np.array(layer_labels)
    
    print(f"Collected {len(layer_activations)} activations and {len(layer_labels)} labels for layer {layer_num}.")
    print(f"Shape of activations for layer {layer_num}: {all_activations_by_layer[layer_num].shape}")
    print(f"Shape of labels for layer {layer_num}: {all_labels_by_layer[layer_num].shape}")

print("Finished collecting activations for all specified layers.")

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

accuracy_scores_by_layer = {}
training_accuracy_scores_by_layer = {}

print("\nPerforming Linear Probing and evaluating accuracy for each layer...")
for layer_num in probing_layers:
    print(f"\n--- Processing Layer {layer_num} ---")
    X = all_activations_by_layer[layer_num]
    y = all_labels_by_layer[layer_num]

    # Split data into training and testing sets
    # Using a small test_size due to limited examples, stratify to maintain label distribution
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    print(f"Shape of X_train for layer {layer_num}: {X_train.shape}")
    print(f"Shape of X_test for layer {layer_num}: {X_test.shape}")
    print(f"Shape of y_train for layer {layer_num}: {y_train.shape}")
    print(f"Shape of y_test for layer {layer_num}: {y_test.shape}")

    # Initialize and train the LogisticRegression model
    log_reg_model = LogisticRegression(solver='liblinear', random_state=42)
    log_reg_model.fit(X_train, y_train)
    print("Logistic Regression model trained successfully.")

    # Make predictions and calculate accuracy on test set
    y_pred_test = log_reg_model.predict(X_test)
    accuracy_test = accuracy_score(y_test, y_pred_test)
    accuracy_scores_by_layer[layer_num] = accuracy_test

    # Calculate accuracy on training set
    y_pred_train = log_reg_model.predict(X_train)
    accuracy_train = accuracy_score(y_train, y_pred_train)
    training_accuracy_scores_by_layer[layer_num] = accuracy_train

    print(f"Model Training Accuracy for layer {layer_num}: {accuracy_train:.4f}")
    print(f"Model Test Accuracy for layer {layer_num}: {accuracy_test:.4f}")

print("\nFinished linear probing for all specified layers.")
print("\nSummary of Probing Accuracy Scores:")
for layer, accuracy in accuracy_scores_by_layer.items():
    print(f"- Layer {layer}: Test Accuracy = {accuracy:.4f}, Training Accuracy = {training_accuracy_scores_by_layer[layer]:.4f}")