# Imports and Setup
---

In [4]:
import json
import gc
import logging
import os
import sys
import time
from pathlib import Path
from typing import Tuple, Optional, Dict, Any, List
import shutil # For file operations

import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
)
from rich.console import Console
from rich.syntax import Syntax
from funcy import select_values # Used in generation function

notebook_dir = Path().resolve()
project_root = notebook_dir.parent # Should be 'aide-ds' root
sys.path.insert(0, str(project_root))
print(f"Project Root added to sys.path: {project_root}")


  from .autonotebook import tqdm as notebook_tqdm


Project Root added to sys.path: /home/asim_aims_ac_za/Home/Aide-playground


In [5]:

# --- AIDE Imports ---
try:
    from aide.interpreter import Interpreter, ExecutionResult
    from aide.utils.response import extract_code, format_code
    from aide.utils import serialize
except ImportError as e:
    print(f"Error importing AIDE modules: {e}")
    print("Ensure AIDE is installed (e.g., `pip install -e .` from aide-ds root)")
    raise # Stop execution if imports fail

# --- Logging Setup ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - [%(levelname)s] - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)] # Log to notebook output
)
logger = logging.getLogger("PromptPlaygroundNotebook")
# Optional: Add file handler if you still want a separate log file
log_file_path = notebook_dir / "playground_notebook.log"
file_handler = logging.FileHandler(log_file_path)
file_handler.setFormatter(logging.Formatter('%(asctime)s - [%(levelname)s] - %(message)s'))
logger.addHandler(file_handler)

# --- Rich Console ---
console = Console()

print("Setup Complete.")


Setup Complete.


#  Helper Functions (Model Loading, Prompt Parsing, Generation)
---

In [6]:

MODEL_CACHE: Dict[str, Tuple[Any, Any]] = {}

def load_model_and_tokenizer(model_id: str, load_in_4bit: bool = True) -> Tuple[AutoModelForCausalLM, AutoTokenizer]:
    """Loads model and tokenizer, caching them."""
    global MODEL_CACHE # Declare modification of global cache
    if model_id in MODEL_CACHE:
        logger.info(f"Using cached model/tokenizer for {model_id}")
        return MODEL_CACHE[model_id]

    logger.info(f"Loading model and tokenizer: {model_id} (4-bit: {load_in_4bit})")
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
            logger.info(f"Set tokenizer pad_token to eos_token: {tokenizer.eos_token}")
        tokenizer.padding_side = "left"

        quantization_config = None
        compute_dtype = torch.float16 # Default compute type
        if load_in_4bit and torch.cuda.is_available():
            if torch.cuda.is_bf16_supported():
                compute_dtype = torch.bfloat16
                logger.info("BF16 supported, using for compute.")
            else:
                logger.info("BF16 not supported, using float16 for compute.")

            quantization_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=compute_dtype,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type="nf4",
            )
            logger.info("Using 4-bit quantization (BitsAndBytesConfig).")
        elif load_in_4bit:
            logger.warning("CUDA not available, cannot load in 4-bit. Loading in default precision.")
        else:
             logger.info("4-bit quantization disabled. Loading in default precision.")


        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            quantization_config=quantization_config,
            device_map="auto",
            torch_dtype=compute_dtype if quantization_config else None, # Match compute type if quantized
            trust_remote_code=True,
        )
        logger.info(f"Model '{model_id}' loaded successfully to device: {model.device}")
        MODEL_CACHE[model_id] = (model, tokenizer) # Cache it
        return model, tokenizer
    except Exception as e:
        logger.exception(f"Failed to load model {model_id}")
        raise


In [7]:

def parse_prompt_file(filepath: Path) -> Tuple[Optional[str], str]:
    """Parses prompt file into system and user parts."""
    try:
        content = filepath.read_text()
        separator = "---USER---"
        if separator in content:
            system_prompt, user_prompt = content.split(separator, 1)
            return system_prompt.strip(), user_prompt.strip()
        else:
            logger.warning(f"Separator '{separator}' not found in {filepath}. Using entire content as user prompt.")
            return None, content.strip()
    except FileNotFoundError:
        logger.error(f"Prompt file not found: {filepath}")
        raise
    except Exception as e:
        logger.error(f"Error reading prompt file {filepath}: {e}")
        raise


In [8]:

def generate_code_batch(
    model: Any,
    tokenizer: Any,
    system_prompt: Optional[str],
    user_prompt: str,
    num_responses: int,
    generation_params: Dict[str, Any]
) -> List[str]:
    """Generates a batch of code responses using the loaded model."""
    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": user_prompt})

    try:
        prompt_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        logger.info("Applied chat template to prompt for batch generation.")
    except Exception as e:
        logger.warning(f"Could not apply chat template for batch generation (error: {e}). Using basic concatenation.")
        prompt_text = (system_prompt + "\n\n" if system_prompt else "") + user_prompt

    logger.info(f"Input prompt (start) for batch generation:\n{prompt_text[:300]}...")

    inputs = tokenizer(prompt_text, return_tensors="pt", padding=True, truncation=False)
    input_ids = inputs["input_ids"].to(model.device)
    attention_mask = inputs["attention_mask"].to(model.device)
    prompt_token_length = input_ids.shape[1]

    do_sample = generation_params.get("temperature", 0.2) > 0.0 or generation_params.get("top_p", None) is not None or generation_params.get("top_k", None) is not None
    if num_responses > 1 and not do_sample:
         logger.warning("Forcing temperature to 0.2 for multiple distinct responses.")
         generation_params["temperature"] = 0.2
         do_sample = True

    gen_kwargs = {
        "temperature": generation_params.get("temperature", 0.2),
        "max_new_tokens": generation_params.get("max_new_tokens", 2048),
        "top_p": generation_params.get("top_p"),
        "top_k": generation_params.get("top_k"),
        "repetition_penalty": generation_params.get("repetition_penalty"),
        "pad_token_id": tokenizer.eos_token_id,
        "eos_token_id": tokenizer.eos_token_id,
        "do_sample": do_sample,
        "num_return_sequences": num_responses,
    }
    gen_kwargs = {k: v for k, v in gen_kwargs.items() if v is not None}

    logger.info(f"Generating {num_responses} responses with config: {gen_kwargs}")
    t0 = time.time()
    outputs = []
    try:
        with torch.no_grad():
            generated_outputs = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                **gen_kwargs
            )
        for i in range(num_responses):
            output_ids = generated_outputs[i][prompt_token_length:]
            output_text = tokenizer.decode(output_ids, skip_special_tokens=True)
            outputs.append(output_text.strip())
    except Exception as e:
        logger.exception("Error during batch generation")
        return [f"Error during generation: {e}"] * num_responses
    t1 = time.time()
    logger.info(f"Batch generation of {num_responses} responses took {t1-t0:.2f} seconds.")
    return outputs

print("Helper functions defined.")



Helper functions defined.


In [9]:
# Cell 3.5: Unload Model Function
import gc # Make sure gc is imported

def unload_model(model_id_to_unload: Optional[str]):
    """Attempts to unload a model and tokenizer from memory and VRAM."""
    global MODEL_CACHE, model, tokenizer # Need global to modify/delete notebook-level vars

    if model_id_to_unload and model_id_to_unload in MODEL_CACHE:
        logger.info(f"Attempting to unload model: {model_id_to_unload}")
        try:
            # Remove from cache first
            m, t = MODEL_CACHE.pop(model_id_to_unload)

            # Clear global references if they point to the unloaded model
            if 'model' in globals() and model is m:
                del model
            if 'tokenizer' in globals() and tokenizer is t:
                del tokenizer

            # Delete the specific references we retrieved
            del m
            del t

            # Aggressive cleanup
            gc.collect() # Run Python garbage collection

            if torch.cuda.is_available():
                logger.info("Clearing CUDA cache...")
                torch.cuda.empty_cache() # Ask PyTorch to release unused cached memory
                logger.info("CUDA cache cleared.")
            else:
                 logger.info("CUDA not available, skipping cache clear.")

            logger.info(f"Successfully initiated unloading for {model_id_to_unload}.")
            return True

        except Exception as e:
            logger.error(f"Error during unloading of {model_id_to_unload}: {e}", exc_info=True)
            # Put it back in cache if deletion failed? Maybe not, keep it simple.
            return False
    else:
        logger.info(f"Model {model_id_to_unload} not found in cache or not specified, nothing to unload.")
        return True # Nothing to do is still success in a way

# Configuration
---

In [10]:

# --- Model ---
model_id = "deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct"
# model_id = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"

# model_id = "HuggingFaceTB/SmolLM2-135M-Instruct" # smaller model for faster testing if needed
load_in_4bit = True # Set based on your VRAM

# --- Prompt ---
# Option 1: Load from file
prompt_file = notebook_dir / "real_prompt.txt"
system_prompt, user_prompt = parse_prompt_file(prompt_file)
# Option 2: Define directly
# system_prompt = "You are a Python coding assistant."
# user_prompt = "Write Python code to read './input/train.csv' and print the shape."

# --- Data Path  ---
REAL_DATA_DIR = project_root / "prompt_playground/example_tasks/house_prices" # Example using AIDE's sample task




In [11]:

# --- Generation ---
num_responses_to_generate = 2 # How many code variations to generate
generation_params = {
    "temperature": 0.6,
    "max_new_tokens": 2048,
    "top_p": 0.95,
    "top_k": None,
    "repetition_penalty": 1.1,
}

# --- Execution ---
execution_timeout = 120 # Seconds

# --- Output ---
output_base_dir = notebook_dir / "playground_notebook_results" # Directory for results

# --- Display Config ---
print(f"Using Model: {model_id} (4-bit: {load_in_4bit})")
print(f"Prompt File: {prompt_file}")
print(f"Real Data Dir: {REAL_DATA_DIR}")
print(f"Output Dir: {output_base_dir}")
print(f"Num Responses: {num_responses_to_generate}")
print(f"Generation Params: {generation_params}")
print(f"Execution Timeout: {execution_timeout}s")

Using Model: deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct (4-bit: True)
Prompt File: /home/asim_aims_ac_za/Home/Aide-playground/prompt_playground/real_prompt.txt
Real Data Dir: /home/asim_aims_ac_za/Home/Aide-playground/prompt_playground/example_tasks/house_prices
Output Dir: /home/asim_aims_ac_za/Home/Aide-playground/prompt_playground/playground_notebook_results
Num Responses: 2
Generation Params: {'temperature': 0.6, 'max_new_tokens': 2048, 'top_p': 0.95, 'top_k': None, 'repetition_penalty': 1.1}
Execution Timeout: 120s


# Load Model (Re-run this cell to change models)
---

In [None]:

target_model_id = "deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct"
load_in_4bit = True 


# --- Logic to handle model switching ---
# Define these globally if they don't exist yet
if 'current_model_id' not in globals():
    current_model_id = None
if 'model' not in globals():
    model = None
if 'tokenizer' not in globals():
    tokenizer = None

# Check if we need to switch models
if current_model_id != target_model_id:
    if current_model_id is not None:
        console.print(f"[bold yellow]Switching model: Unloading '{current_model_id}'...[/bold yellow]")
        if unload_model(current_model_id):
            # Reset global variables after successful unload attempt
            current_model_id = None
            model = None
            tokenizer = None
            console.print(f"[green]Unload initiated for '{current_model_id}'.[/green]")
        else:
             console.print(f"[bold red]Error unloading '{current_model_id}'. Check logs. Cannot load new model.[/bold red]")
             # Optional: raise an error here or exit cell execution
             raise RuntimeError(f"Failed to unload model {current_model_id}")

    # Load the new model if different from current or if nothing is loaded
    console.print(f"[bold blue]Loading new model: '{target_model_id}'...[/bold blue]")
    model, tokenizer = load_model_and_tokenizer(target_model_id, load_in_4bit=load_in_4bit)
    current_model_id = target_model_id # Update the tracker
    console.print(f"[bold green]Model '{target_model_id}' and tokenizer loaded.[/bold green]")

else:
    console.print(f"[bold yellow]Model '{target_model_id}' is already loaded. Skipping reload.[/bold yellow]")

# --- Sanity check ---
if model is None or tokenizer is None:
     console.print("[bold red]Error: Model or Tokenizer failed to load correctly.[/bold red]")
else:
     console.print(f"Currently loaded model: [bold cyan]{current_model_id}[/bold cyan] on device: [cyan]{model.device}[/cyan]")

2025-04-13 17:44:54,833 - [INFO] - Loading model and tokenizer: deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct (4-bit: True)
2025-04-13 17:44:55,678 - [INFO] - BF16 supported, using for compute.
2025-04-13 17:44:55,681 - [INFO] - Using 4-bit quantization (BitsAndBytesConfig).


A new version of the following files was downloaded from https://huggingface.co/deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct:
- configuration_deepseek.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.
A new version of the following files was downloaded from https://huggingface.co/deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct:
- modeling_deepseek.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.
Fetching 4 files:   0%|          | 0/4 [01:12<?, ?it/s]


# Generate Responses (Re-run this cell to test generation)
---

In [2]:
console.rule(f"[bold blue]Generating {num_responses_to_generate} Responses")

# Make sure model/tokenizer are loaded
if 'model' not in locals() or 'tokenizer' not in locals():
     raise NameError("Model and tokenizer not loaded. Run Cell 4 first.")
if 'system_prompt' not in locals() or 'user_prompt' not in locals():
     raise NameError("Prompts not defined. Run Cell 3 first.")

all_raw_responses = generate_code_batch(
    model, tokenizer, system_prompt, user_prompt,
    num_responses_to_generate, generation_params
)

console.print(f"[bold green]Generated {len(all_raw_responses)} responses.[/bold green]")
# Optionally print previews
for i, r in enumerate(all_raw_responses):
   print(f"\n--- Response {i+1} Preview ---")
   print(r[:300] + "..."+"\n")
   print("___________________________")


NameError: name 'console' is not defined

# Process & Execute Responses (Re-run this cell after generating)
---

In [3]:

if 'all_raw_responses' not in locals() or not all_raw_responses:
    print("No responses generated yet. Run Cell 5 first.")
else:
    output_base_dir.mkdir(parents=True, exist_ok=True)
    results_summary = []

    # Ensure REAL_DATA_DIR exists before starting the loop
    if not REAL_DATA_DIR.exists():
        raise FileNotFoundError(f"Real data directory not found: {REAL_DATA_DIR}")

    for i, raw_response in enumerate(all_raw_responses):
        console.rule(f"[bold blue]Processing Response {i+1}/{len(all_raw_responses)}")
        response_dir = output_base_dir / f"response_{i}"
        response_dir.mkdir(exist_ok=True)

        # --- Setup Isolated Workspace ---
        workspace_dir = response_dir / "workspace"
        if workspace_dir.exists():
            shutil.rmtree(workspace_dir)
        workspace_dir.mkdir(exist_ok=True)
        input_dir = workspace_dir / "input"
        input_dir.mkdir(exist_ok=True) # Create input dir first
        submission_dir = workspace_dir / "submission"; submission_dir.mkdir(exist_ok=True)
        working_dir = workspace_dir / "working"; working_dir.mkdir(exist_ok=True)

        # --- Copy REAL Data ---
        logger.info(f"Copying data from {REAL_DATA_DIR} to {input_dir}")
        try:
            # Use copytree for robustness with directories
            shutil.copytree(REAL_DATA_DIR, input_dir, dirs_exist_ok=True)
            # Optional: Preprocess if needed (like unzipping in AIDE)
            # from aide.utils import preproc_data
            # preproc_data(input_dir)
        except Exception as e:
            logger.error(f"Failed to copy data for response {i+1}: {e}", exc_info=True)
            console.print(f"[bold red]Response {i+1}: FAILED to copy data. Skipping execution.[/bold red]")
            continue

        # --- Save Raw Response ---
        raw_response_path = response_dir / "raw_response.txt"
        raw_response_path.write_text(raw_response, errors='ignore') # Ignore encoding errors just in case
        logger.info(f"Raw response {i+1} saved to: {raw_response_path}")
        console.print(f"[bold cyan]Raw Response {i+1}:[/bold cyan]\n{raw_response[:500]}...")
        print("-" * 10)

        # --- Extract Code ---
        extracted_code = extract_code(raw_response)
        exec_result = None
        summary = {"index": i, "success": False, "error": None, "output_preview": "", "code_path": None, "log_path": None}

        if not extracted_code:
            logger.error(f"Response {i+1}: Code extraction FAILED.")
            console.print("[bold red]Code extraction FAILED.[/bold red]")
            summary["error"] = "Code Extraction Failed"
        else:
            formatted_extracted_code = format_code(extracted_code)
            code_path = response_dir / "extracted_code.py"
            code_path.write_text(formatted_extracted_code)
            summary["code_path"] = str(code_path)
            logger.info(f"Extracted code {i+1} saved to: {code_path}")
            console.print(f"[bold green]Extracted Code {i+1}:[/bold green]")
            console.print(Syntax(formatted_extracted_code, "python", theme="default", line_numbers=True, word_wrap=True))
            print("-" * 10)

            # --- Execute Code ---
            logger.info(f"Executing code for response {i+1}...")
            # Use the workspace directory specific to this response run
            interpreter = Interpreter(working_dir=workspace_dir, timeout=execution_timeout)
            try:
                exec_result = interpreter.run(formatted_extracted_code, reset_session=True)
                summary["success"] = exec_result.exc_type is None
                summary["error"] = exec_result.exc_type
                summary["output_preview"] = "\n".join(exec_result.term_out[:10]) + ("..." if len(exec_result.term_out) > 10 else "")
            except Exception as e:
                 logger.error(f"Interpreter failed for response {i+1}: {e}", exc_info=True)
                 summary["error"] = f"Interpreter Error: {e}"
                 exec_result = ExecutionResult(term_out=[f"Interpreter Error: {e}"], exec_time=0, exc_type=type(e).__name__)
            finally:
                try:
                    interpreter.cleanup_session()
                except Exception as e_clean:
                     logger.error(f"Error cleaning up interpreter for response {i+1}: {e_clean}")

            # --- Save Execution Log ---
            exec_log_path = response_dir / "execution_log.json"
            summary["log_path"] = str(exec_log_path)
            try:
                 # Ensure exec_result exists before saving
                 if exec_result:
                     serialize.dump_json(exec_result, exec_log_path)
                     logger.info(f"Execution log saved to: {exec_log_path}")
                 else:
                     logger.warning(f"No execution result to save for response {i+1}")
            except Exception as e:
                 logger.error(f"Failed to save execution log: {e}")
                 if exec_result:
                      exec_log_path.with_suffix(".txt").write_text(str(exec_result))

        results_summary.append(summary)
        console.print(f"[bold magenta]Execution Result {i+1}:[/bold magenta]")
        if exec_result:
             console.print(f"  Success: {exec_result.exc_type is None}")
             console.print(f"  Execution Time: {exec_result.exec_time:.2f}s")
             if exec_result.exc_type:
                 console.print(f"  Error Type: [bold red]{exec_result.exc_type}[/bold red]")
             term_out_str = "\n".join(exec_result.term_out) if isinstance(exec_result.term_out, list) else str(exec_result.term_out)
             console.print(f"  Terminal Output (preview):")
             console.print("[dim]" + term_out_str[:500] + ("\n..." if len(term_out_str) > 500 else "") + "[/dim]")
        else:
             console.print("  Execution skipped or failed before result generation.")
        console.print(f"  Full output/logs saved in: {response_dir}")
        print("-" * 40)


No responses generated yet. Run Cell 5 first.


# Final Summary 
---


In [None]:

# --- Final Summary ---
console.rule("[bold green]Run Summary")
for res in results_summary:
    status = "[green]Success[/green]" if res["success"] else f"[red]Failed ({res['error']})[/red]"
    console.print(f"Response {res['index']}: Status={status}")
    if res["code_path"]: console.print(f"  Code: {res['code_path']}")
    if res["log_path"]: console.print(f"  Log:  {res['log_path']}")
    # Only show preview if failed and there is output
    if not res["success"] and res["output_preview"]: console.print(f"  Output Preview: [dim]{res['output_preview']}[/dim]")

console.print("\nPlayground iteration finished.")