## Section 1: Importing

In [72]:
# Import the 'drive' module from google.colab library.
from google.colab import drive

# Mount Google Drive at the '/content/drive' path in the Colab filesystem.
# Requires user authorization.
drive.mount('/content/drive')

# Confirm successful mounting.
print("Drive mounted successfully.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Drive mounted successfully.


In [73]:
print("Installing libraries...")
!pip install --upgrade pip

# -q ensures minimal output (quiet installation)
!pip install rank-bm25
# update pip
!pip install --upgrade pip -q

# install bm25 (worked already, but no harm)
!pip install -q rank-bm25

!pip install -q \
  "transformers" \
  "datasets" \
  "torch" \
  "accelerate" \
  "bitsandbytes" \
  "huggingface_hub" \
  "sacrebleu>=2.0.0" \
  "codebleu" \
  "tree-sitter-python"
  #"fsspec==2024.12.0" \

# 🚀 upgrade to a good combination ─────────────
!pip install -qU "datasets>=2.19.0" "fsspec>=2024.3.0"  \
                 "huggingface_hub>=0.22.2"  "aiohttp"


import os
import time # for the delay before nvidia-smi
import warnings # for non-critical warnings
import shutil
import tarfile # for .tar archives
import ast
import json
import random
import re
import textwrap # for snipper preview
import traceback
import numpy as np
import torch

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
# from typing import List, Optional # For type hinting (optional)
# from transformers import PreTrainedTokenizerBase # For type hinting (optional)

from datasets import load_dataset, DownloadMode
from huggingface_hub import hf_hub_download
from huggingface_hub.utils import EntryNotFoundError

try:
    from tqdm.auto import tqdm  # for progress bars (optional)
    USE_TQDM = True
except ImportError:
    USE_TQDM = False
    print("Library 'tqdm' not found, progress bar will not be shown.")
    print("You can install it with: !pip install -q tqdm")

from rank_bm25 import BM25Okapi

print("\nLibraries installed (or updated) successfully!")


Installing libraries...
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [datasets]
[1A[2K[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gcsfs 2025.3.2 requires fsspec==2025.3.2, but you have fsspec 2025.3.0 which is incompatible.[0m[31m
[0m
Libraries installed (or updated) successfully!


In [74]:
# ────────────────────────────────────────────────────────────────
# Prompt templates - version 1
# ────────────────────────────────────────────────────────────────

def build_baseline_prompt(instruction: str) -> str:
    """
    Baseline: only the task, then a '### Code:' marker to start generation.
    """
    return f"""\
### Task:
{instruction.strip()}

### Code:
"""

def build_rag_prompt(instruction: str, retrieved: str) -> str:
    """
    RAG: first show retrieved examples, then the task, then '### Code:'.
    """
    return f"""\
### Retrieved Examples:
{retrieved.strip()}

### Task:
{instruction.strip()}

### Code:
"""

In [75]:
# ────────────────────────────────────────────────────────────────
# Prompt templates - version 2
# ────────────────────────────────────────────────────────────────

def build_baseline_prompt(instruction: str) -> str:
    """
    Baseline: very explicit, with sections for clarity.
    """
    return f"""\
### Library:
You will use the `seedemu` Python library to build a network emulation.

### Task Description:
{instruction.strip()}

### Requirements:
1. Import only from `seedemu` (layers, services, core, compiler).
2. Create objects in this order: Emulator → Layers → Services → Bindings → Dump.
3. Use clear variable names (e.g. `base`, `routing`, `ebgp`, `sim`).
4. Target Python 3.8+ syntax.

### Output Format:
- Provide only valid Python code.
- No comments, no extra text.
- Start at the first line of code (do not repeat the task).

### Code:
"""

def build_rag_prompt(instruction: str, retrieved: str) -> str:
    """
    RAG: include retrieved examples plus the detailed task template.
    """
    return f"""\
### Retrieved Examples:
{retrieved.strip()}

### Library:
Use the `seedemu` Python library.

### Task Description:
{instruction.strip()}

### Requirements:
1. Imports: `seedemu.layers`, `seedemu.services`, `seedemu.core`, `seedemu.compiler`.
2. Instantiate Emulator, then Base, Routing, eBGP layers in order.
3. Install the domain name caching service on specified hosts.
4. Add private eBGP peerings between ASes.
5. Finally, dump the emulator state to `base-component.bin`.

### Output Format:
- Return **only** runnable Python code.
- No comments or markdown.
- Do not echo the instructions.

### Code:
"""


In [76]:
# ────────────────────────────────────────────────────────────────
# Prompt templates - version 3
# ────────────────────────────────────────────────────────────────

def build_baseline_prompt(instruction: str) -> str:
    """
    Baseline: given only the instruction, ask the model to produce:
      1. A clear function signature with type hints
      2. A concise docstring explaining behavior, inputs, and outputs
      3. The implementation, following PEP8
      4. At least one simple unit test demonstrating correct usage
    """
    return f"""\
You are a senior Python engineer.  Fulfill the following task by writing production-ready code.

**Task**:
{instruction.strip()}

**Requirements**:
- Python 3, include type hints
- One well-formed function or class with a descriptive name
- A docstring (inputs, outputs, edge cases)
- PEP8 style (4-space indent, snake_case)
- At least one unit test using `assert` or `unittest`

**Implementation**:
```python
"""

def build_rag_prompt(instruction: str, retrieved: str) -> str:
    """
    RAG: first show retrieved examples for inspiration, then the same structured prompt:
      instruction, requirements, and a code block marker.
    """
    return f"""\
You are a senior Python engineer.  Use the retrieved examples to guide your implementation.

**Retrieved Examples**:
{retrieved.strip()}

**Task**:
{instruction.strip()}

**Requirements**:
- Python 3 with type hints
- Clean function or class design with a docstring
- Adhere to PEP8 conventions
- Include at least one unit test

**Implementation**:
```python
"""


## Section 2: LLM and Tokenizer Loading with 4-bit Quantization


In [77]:
# check that there is only one selected model

# --- Gemma Series (Google) ---
# model_name = "google/codegemma-7b"
# model_name = "google/codegemma-7b-it"

# --- Qwen Series (Alibaba) ---
# model_name = "Qwen/Qwen2.5-Coder-7B-Instruct"
# model_name = "Qwen/Qwen2.5-Coder-3B-Instruct"
# model_name = "Qwen/Qwen2.5-Coder-1.5B-Instruct"

# --- Deepseek Coder Series (Deepseek AI) ---
# model_name = "deepseek-ai/deepseek-coder-7b-instruct-v1.5"
#model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"
# model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
model_name = "deepseek-ai/deepseek-coder-1.3b-base"

# --- Code Llama Series (Meta) ---
# model_name = "codellama/CodeLlama-7b-Instruct-hf"

# --- Phi Series (Microsoft) ---
# model_name = "microsoft/Phi-4-mini-instruct"
# model_name = "microsoft/Phi-4-multimodal-instruct"

TRUST_REMOTE_CODE_MODELS = [
    "microsoft/Phi-",
    "Qwen/",
]
trust_code = any(model_name.startswith(prefix) for prefix in TRUST_REMOTE_CODE_MODELS)

print(f"Setting trust_remote_code={trust_code} for {model_name}")
if trust_code:
    print("WARNING: trust_remote_code=True will execute Python code from the model's Hugging Face repo")


Setting trust_remote_code=False for deepseek-ai/deepseek-coder-1.3b-base


In [78]:
# 4-bit NF4 quantisation
QUANT_CFG = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
        if (torch.cuda.is_available() and torch.cuda.is_bf16_supported())
        else torch.float16,
    bnb_4bit_use_double_quant=True,
)

In [79]:
# == 5.  Where the cached copy will live on your Drive ===========
CACHE_ROOT = "/content/drive/MyDrive/llm_cache"
CACHE_DIR  = os.path.join(
    CACHE_ROOT,
    model_name.replace("/", "_") + "_4bit_nf4",
)

META_FILE  = os.path.join(CACHE_DIR, "metadata.json")

In [80]:
# == 5.  Build the 4‐bit config (for GPU only) ===================
# (use bfloat16 on bf16‐capable GPUs, else float16)
compute_dtype = (
    torch.bfloat16
    if torch.cuda.is_available() and torch.cuda.is_bf16_supported()
    else torch.float16
)

QUANT_CFG = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=True,
)

def _qcfg_to_dict(cfg):
    return {
        "load_in_4bit": cfg.load_in_4bit,
        "bnb_4bit_quant_type": cfg.bnb_4bit_quant_type,
        "bnb_4bit_compute_dtype": str(cfg.bnb_4bit_compute_dtype),
        "bnb_4bit_use_double_quant": cfg.bnb_4bit_use_double_quant,
    }

REQ_META = {
    "model_name": model_name,
    "quant_cfg":  _qcfg_to_dict(QUANT_CFG),
}


In [81]:
# == 6.  Check for existing cache & metadata ====================
use_cache = False
if os.path.isfile(META_FILE):
    try:
        saved = json.load(open(META_FILE))
        use_cache = saved == REQ_META
        print("⚡ Cache metadata match:", use_cache)
    except Exception:
        print("⚠️  Could not parse metadata.json; ignoring cache.")

# == 7.  Load tokenizer & model (fast or slow path) ==============
trust_code = model_name.startswith(("microsoft/Phi-", "Qwen/"))

try:
    if use_cache:
        print("⚡ Loading from Drive cache…")
        tokenizer = AutoTokenizer.from_pretrained(CACHE_DIR, local_files_only=True, trust_remote_code=trust_code)
        model     = AutoModelForCausalLM.from_pretrained(
            CACHE_DIR,
            device_map="auto",
            low_cpu_mem_usage=True,
            trust_remote_code=trust_code,
        )
    else:
        # decide whether we *can* do 4-bit quant:
        do_4bit = torch.cuda.is_available()
        print(f"⏳ No valid cache. CUDA available? {do_4bit}")
        print(f"⏳ {'Quantising 4-bit…' if do_4bit else 'Loading fp16…'} this will happen once")

        # ─── tokenizer ───────────────────────────────────────────
        tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=trust_code)
        if tokenizer.pad_token_id is None:
            tokenizer.pad_token_id = tokenizer.eos_token_id
            if tokenizer.pad_token is None:
                tokenizer.add_special_tokens({"pad_token": tokenizer.eos_token})

        # ─── model ───────────────────────────────────────────────
        if do_4bit:
            model = AutoModelForCausalLM.from_pretrained(
                model_name,
                quantization_config=QUANT_CFG,
                device_map="auto",
                trust_remote_code=trust_code,
                low_cpu_mem_usage=True,
            )
        else:
            model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype=compute_dtype,
                device_map="auto",
                trust_remote_code=trust_code,
            )

        # ─── Save cache for next time ───────────────────────────
        print("💾 Saving to Drive cache…")
        os.makedirs(CACHE_DIR, exist_ok=True)
        tokenizer.save_pretrained(CACHE_DIR)
        model.save_pretrained(CACHE_DIR)
        with open(META_FILE, "w") as f:
            json.dump(REQ_META, f)
        print("✅ Cache written at", CACHE_DIR)

    # ensure model.pad_token_id
    if getattr(model, "config", None) and model.config.pad_token_id is None:
        model.config.pad_token_id = tokenizer.pad_token_id

    print("🎉 Model & tokenizer ready!")

except Exception as e:
    print("❌ Error loading model/tokenizer:")
    traceback.print_exc()
    raise

⚡ Cache metadata match: True
⚡ Loading from Drive cache…
🎉 Model & tokenizer ready!


## Section 3: Dataset Preparation and Validation



In [82]:
# --- 1. Configuration of Google Drive directory ---

drive_save_path = '/content/drive/MyDrive/RAG_Project/' # to store results and outputs
# check that the directory exists!

try:
    os.makedirs(drive_save_path, exist_ok=True)
    print(f"Google Drive Directory available: {drive_save_path}")
except OSError as e:
    print(f"Warning: can not create or verify the existence of the directory: {drive_save_path}. Details: {e}")


Google Drive Directory available: /content/drive/MyDrive/RAG_Project/


In [83]:
#!pip install --upgrade --quiet datasets==2.16.0 fsspec==2023.9.2

In [84]:
from datasets import load_dataset, DownloadMode

dataset_name = "JetBrains-Research/lca-library-based-code-generation"
data_split   = "test"

print(f"\n▶️  Loading dataset '{dataset_name}' (split='{data_split}')…")
print("   (uses HF cache on Colab VM — do NOT point cache_dir at Drive)")

try:
    lca_dataset_split = load_dataset(
        dataset_name,
        split=data_split,
        # download_mode=DownloadMode.FORCE_REDOWNLOAD,  # uncomment to force fresh pull
    )
    print("✅ Dataset loaded — now filtering to repo_name == 'seed-emulator'")
    lca_dataset_split = lca_dataset_split.filter(
        lambda ex: ex["repo_name"] == "seed-emulator"
    )
    print(f"✅ Filtered: {len(lca_dataset_split)} examples in 'seed-emulator'")

except Exception as e:
    print(f"❌ ERROR loading or filtering dataset: {e}")



▶️  Loading dataset 'JetBrains-Research/lca-library-based-code-generation' (split='test')…
   (uses HF cache on Colab VM — do NOT point cache_dir at Drive)
✅ Dataset loaded — now filtering to repo_name == 'seed-emulator'
✅ Filtered: 13 examples in 'seed-emulator'


## Section 4: Repository Archive Download & Preparation

In [85]:
# --- 1. Exception Imports ---
try:
    from huggingface_hub.utils import HfHubHTTPError, RepositoryNotFoundError, EntryNotFoundError
    print("Succesfully import exceptions from huggingface_hub.utils.")
except ImportError:
    print("WARNING: can not import exceptions from huggingface_hub.utils, try from .errors")
    try:
        from huggingface_hub.errors import HfHubHTTPError, RepositoryNotFoundError, EntryNotFoundError
        print("Importing exceptions from huggingface_hub.errors completed.")
    except ImportError:
        print("ERROR: can not import exceptions from huggingface_hub.")
        class HfHubHTTPError(Exception): pass
        class RepositoryNotFoundError(Exception): pass
        class EntryNotFoundError(Exception): pass

Succesfully import exceptions from huggingface_hub.utils.


In [86]:
# --- 2. Archive Download ---

repo_id = "JetBrains-Research/lca-library-based-code-generation"
filename_in_repo = "repos/seed-labs__seed-emulator.tar.gz"
desired_local_archive_path = "/content/seed-labs__seed-emulator.tar.gz"
download_base_dir = "/content/"

print(f"\n--- Download and configuration ---")
print(f"Repo: {repo_id}")
print(f"File in repo: {filename_in_repo}")
print(f"Desired destination: {desired_local_archive_path}")

actual_downloaded_path = None

# actual donwload from Hugging Face
try:
    print(f"\nStarting download from Hugging Face Hub...")
    actual_downloaded_path = hf_hub_download(
        repo_id=repo_id,
        filename=filename_in_repo,
        repo_type="dataset",
        local_dir=download_base_dir,
        local_dir_use_symlinks=False,
    )
    print(f"Download completed. File saved at: {actual_downloaded_path}")

# --- 3. Error Handling ---
except RepositoryNotFoundError:
    print(f"\nERROR: Repository '{repo_id}' not found on Hugging Face Hub.")
    print("  Make sure the repository name is correct.")
except EntryNotFoundError:  # Specific file not found in the repo
    print(f"\nERROR: File/Entry '{filename_in_repo}' not found in the repository '{repo_id}'.")
except HfHubHTTPError as e:  # HTTP errors (including 401, 403, 404 not already caught above)
    print(f"\nHTTP ERROR during download from Hugging Face Hub: {e}")
    if hasattr(e, 'response') and e.response is not None:
        print(f"  Status Code: {e.response.status_code}")
        if e.response.status_code == 404:
            print(f"  -> The file '{filename_in_repo}' or the repo '{repo_id}' may not exist (Error 404).")
    print(f"  Please check the repo_id, filename_in_repo, and your internet connection or HF token if necessary.")
except Exception as e:
    # Catch other unexpected errors
    import traceback
    print(f"\nUNEXPECTED ERROR during the download:")
    # print(traceback.format_exc())  # Uncomment for full traceback during debugging
    print(f"  Error Type: {type(e).__name__}, Message: {e}")

# --- 4. File Relocation & Cleanup ---
archive_ready = False
if actual_downloaded_path and os.path.exists(actual_downloaded_path):
    if os.path.abspath(actual_downloaded_path) == os.path.abspath(desired_local_archive_path):
        print(f"\nThe archive is already at the desired final location: {desired_local_archive_path}")
        archive_ready = True
    else:
        try:
            print(f"\Moving '{os.path.basename(actual_downloaded_path)}' to '{desired_local_archive_path}'...")
            os.makedirs(os.path.dirname(desired_local_archive_path), exist_ok=True)
            shutil.move(actual_downloaded_path, desired_local_archive_path)
            print(f"Move completed successfully.")
            archive_ready = True

            # Clean up intermediate directory if empty
            download_parent_dir = os.path.dirname(actual_downloaded_path)
            if (os.path.exists(download_parent_dir) and
                os.path.abspath(download_parent_dir) != os.path.abspath(download_base_dir) and
                os.path.abspath(download_parent_dir).startswith(os.path.abspath(download_base_dir)) and
                not os.listdir(download_parent_dir)):
                try:
                    print(f"Removing empty intermediate directory: {download_parent_dir}")
                    os.rmdir(download_parent_dir)
                except OSError as rmdir_e:
                    print(f"  Warining: can not remove {download_parent_dir}. Issue: {rmdir_e}")

        except Exception as move_e:
            print("\nERROR during move or cleanup of downloaded file:")
            print(f"  Error: {move_e}")
            print(f"  The downloaded file may still be located at: {actual_downloaded_path}")
            archive_ready = False

elif not actual_downloaded_path:
     print("\nDownload failed. Cannot proceed.")
else:
     print(f"\nINTERNAL ERROR: Download path ({actual_downloaded_path}) does not exist after the attempt.")

# --- 5. Final Verification ---
print("\nFinal check:")
if archive_ready and os.path.exists(desired_local_archive_path):
    print(f"[OK] The final archive is ready at: {desired_local_archive_path}")
else:
    print(f"[ERROR] The final archive was NOT found or prepared correctly at: {desired_local_archive_path}")

print("\n--- End of Download and Preparation ---")


--- Download and configuration ---
Repo: JetBrains-Research/lca-library-based-code-generation
File in repo: repos/seed-labs__seed-emulator.tar.gz
Desired destination: /content/seed-labs__seed-emulator.tar.gz

Starting download from Hugging Face Hub...


seed-labs__seed-emulator.tar.gz:   0%|          | 0.00/24.0M [00:00<?, ?B/s]

Download completed. File saved at: /content/repos/seed-labs__seed-emulator.tar.gz
\Moving 'seed-labs__seed-emulator.tar.gz' to '/content/seed-labs__seed-emulator.tar.gz'...
Move completed successfully.
Removing empty intermediate directory: /content/repos

Final check:
[OK] The final archive is ready at: /content/seed-labs__seed-emulator.tar.gz

--- End of Download and Preparation ---


## Section 5: Source Extraction & Knowledge Base Construction





In [87]:
# --- 1. Archive Extraction ---
# --- 1.1. Configuration ---
# Path to the downloaded archive (should already exist from the previous cell)
local_archive_path = '/content/seed-labs__seed-emulator.tar.gz'
# Base directory where we want to extract the archive contents
extract_dir_parent = "/content/library_sources/"

# This variable will hold the actual path to the main extracted folder.
# It will be determined after extraction is complete.
final_extracted_code_path = None

print("--- Extraction archive ---")
print(f"Archive: {local_archive_path}")
print(f"Destination directory: {extract_dir_parent}")

--- Extraction archive ---
Archive: /content/seed-labs__seed-emulator.tar.gz
Destination directory: /content/library_sources/


In [88]:
# --- 1.2. Safety Check ---
if not os.path.exists(local_archive_path):
    print(f"\n[ERROR] Source archive not found: {local_archive_path}")
    print("  Make sure the download cell was run correctly.")


else:
    try:
        # Optional: clean destionation directory before the execution (if next instruction is not commented)
        if os.path.exists(extract_dir_parent): shutil.rmtree(extract_dir_parent)

        # Create destination directory
        # exist_ok=True avoids errors if already exists
        os.makedirs(extract_dir_parent, exist_ok=True)
        print(f"\nTarget extraction directory '{extract_dir_parent}' is ready.")

        # --- 1.3. Unpacking ---
        # extract the archive
        print(f"Starting extraction of '{os.path.basename(local_archive_path)}'...")
        with tarfile.open(local_archive_path, "r:gz") as tar:
            tar.extractall(path=extract_dir_parent)
        print("Extraction completed successfully.")

        # --- 1.4. Dynamic Path Resolution ---
        # dynamically determine the extracted path
        try:
            extracted_items = os.listdir(extract_dir_parent)
            if len(extracted_items) == 1 and os.path.isdir(os.path.join(extract_dir_parent, extracted_items[0])):
                final_extracted_code_path = os.path.join(extract_dir_parent, extracted_items[0])
                print(f"Identified main extracted directory: {final_extracted_code_path}")
            elif len(extracted_items) > 0:
                 # look for a folder matching the archive's base name
                 archive_basename = os.path.basename(local_archive_path).replace('.tar.gz', '').replace('.tgz', '')
                 potential_match = os.path.join(extract_dir_parent, archive_basename)
                 if os.path.isdir(potential_match):
                     final_extracted_code_path = potential_match
                     print(f"Found potential matching directory: {final_extracted_code_path}")
                 else:
                     first_item_path = os.path.join(extract_dir_parent, extracted_items[0])
                     if os.path.isdir(first_item_path):
                          final_extracted_code_path = first_item_path
                          print(f"WARNING: Multiple items found. Assuming first directory: {final_extracted_code_path}")
                     else:
                          print(f"WARNING: No main directory found in the extraction folder {extract_dir_parent}.")
                          print(f"  Contents: {extracted_items}")
                          print(f"  'final_extracted_code_path' might be set manually.")
                          final_extracted_code_path = extract_dir_parent # Fallback: use the parent dir
                          print(f"  Impostato fallback a: {final_extracted_code_path}")

            else:
                 print(f"WARNING: Extraction folder '{extract_dir_parent}' is empty after extraction.")

        except Exception as list_e:
             print(f"Issue while analyzing the extracted data: {list_e}")

    except tarfile.ReadError:
        print(f"\n[ERROR] Cannot read archive: {local_archive_path}. It may be corrupted.")
    except FileNotFoundError:
        # can happen only if local_archive_path is removed
        print(f"\n[ERROR] Archive file not found during open attempt: {local_archive_path}")
    except Exception as e:
        print(f"\n[ERROR] Unexpected error during preparation or extraction:")
        # print(traceback.format_exc()) # uncomment for debug
        print(f"  Error Type: {type(e).__name__}, Message: {e}")



Target extraction directory '/content/library_sources/' is ready.
Starting extraction of 'seed-labs__seed-emulator.tar.gz'...
Extraction completed successfully.
Identified main extracted directory: /content/library_sources/mnt


In [89]:
# --- 1.5. Final check ---

print("\nFinal check:")
if final_extracted_code_path and os.path.isdir(final_extracted_code_path):
    print(f"[OK] The extracted source code path is: {final_extracted_code_path}")
    print("\nPartial content of the extracted directory (first 10 entries):")
    try:
        content_list = os.listdir(final_extracted_code_path)
        for item in content_list[:10]:
            print(f"  - {item}")
        if len(content_list) > 10:
            print("  ...")
    except Exception as e:
        print(f"  Errore while listing the content of {final_extracted_code_path}: {e}")
else:
    print(f"[ERROR] Unable to determine or locate the extracted code directory.")
    print(f"         'final_extracted_code_path' is: {final_extracted_code_path}")
    print(f"         Make sure the extraction completed successfully.")


print("\n--- End of Archive Extraction ---")

# Make the variable available for subsequent cells (optional but useful)
# You may want to rename it to `extracted_code_path` if subsequent cells
# use that specific name.
# extracted_code_path = final_extracted_code_path
# print(f‘\nVariable “extracted_code_path” set to: {extracted_code_path}’)


Final check:
[OK] The extracted source code path is: /content/library_sources/mnt

Partial content of the extracted directory (first 10 entries):
  - data

--- End of Archive Extraction ---


In [90]:
!pip install --quiet --upgrade \
    datasets==2.16.0 \
    fsspec==2023.9.2

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [datasets]
[1A[2K[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gcsfs 2025.3.2 requires fsspec==2025.3.2, but you have fsspec 2023.9.2 which is incompatible.[0m[31m
[0m

In [91]:
from datasets import load_dataset

if 'lca_dataset_split' not in globals() or lca_dataset_split is None:
    print("ℹ️  (Re)loading the `test` split of the LCA dataset…")
    lca_dataset_split = load_dataset(
        "JetBrains-Research/lca-library-based-code-generation",
        split="test"
    ).filter(lambda ex: ex["repo_name"] == "seed-emulator")
    print(f"✅ Loaded {len(lca_dataset_split)} examples from 'seed-emulator'")


# --- 1. Configuration (pre Snippet Extraction Helpers) ---
SAMPLE_INDEX = 0       # Index of the dataset sample to process
MAX_KB_SIZE = 15000    # Max number of code snippets to include in the KB (to limit RAM)
FALLBACK_ENCODING = 'iso-8859-1'  # Encoding to use if UTF-8 fails
DRIVE_KB_SAVE_DIR = '/content/drive/MyDrive/RAG_Project/library_kbs'  # Directory to save KBs on Google Drive


# Check the existance of the needed variables
if 'lca_dataset_split' not in locals() or not lca_dataset_split:
    raise NameError("CRITICAL ERROR: Variable 'lca_dataset_split' is not defined or is empty. Rerun the dataset loading cell.")
if 'final_extracted_code_path' not in locals() or not final_extracted_code_path:
     # Fallback: try to use old name
     if 'extracted_code_path' in locals() and extracted_code_path:
          warnings.warn("Variable 'final_extracted_code_path' not found, using 'extracted_code_path' as fallback.")
          final_extracted_code_path = extracted_code_path
     else:
          raise NameError("CRITICAL ERROR: Variable 'final_extracted_code_path' (or 'extracted_code_path') is not defined. Rerun the archive extraction cell.")
if not os.path.isdir(final_extracted_code_path):
     raise FileNotFoundError(f"CRITICAL ERROR: The extracted code path '{final_extracted_code_path}' does not exist or is not a directory. Check the archive extraction step.")

# Actual source code path from previous cell
library_source_dir = final_extracted_code_path


In [92]:
# --- 2. Snippet Extraction Helpers ---

def extract_code_units(py_file_path, fallback_encoding=FALLBACK_ENCODING):
    """Extracts functions and classes from a Python file as strings, with improved error handling."""
    units = []
    source = None
    encoding_used = 'utf-8'
    try:
        # Attempt to read with UTF-8
        with open(py_file_path, 'r', encoding='utf-8') as file:
            source = file.read()
    except UnicodeDecodeError:
        # Fallback to the specified encoding
        encoding_used = fallback_encoding
        try:
            with open(py_file_path, 'r', encoding=fallback_encoding) as file:
                source = file.read()
            # warnings.warn(f"Used encoding '{encoding_used}' for {py_file_path}") # Optional: Log used encoding
        except Exception as read_e:
            # print(f"  Error reading file {py_file_path} (even with {encoding_used}): {read_e}")
            return units # Nothing we can do if reading fails
    except PermissionError:
        # print(f"  Permission error reading {py_file_path}")
        return units
    except Exception as read_e:
        # print(f"  Unexpected error reading {py_file_path}: {read_e}")
        return units

    # If reading succeeds, try to parse
    if source is not None:
        try:
            tree = ast.parse(source, filename=py_file_path)
            # Check availability of get_source_segment (should be available in Python 3.8+)
            can_get_segment = hasattr(ast, 'get_source_segment')

            for node in ast.walk(tree):
                if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
                    code_segment = None
                    if can_get_segment:
                        try:
                            code_segment = ast.get_source_segment(source, node)
                        except Exception as segment_e:
                            # Sometimes the segment can't be extracted for complex nodes or decorators
                            # print(f"  Warning: Error extracting segment ({type(node).__name__}) in {py_file_path}: {segment_e}")
                            pass
                    else: # Very simple fallback if get_source_segment is not available
                        code_segment = ast.dump(node) # Not ideal, but better than nothing

                    if code_segment:
                        units.append(code_segment)

        except SyntaxError as syn_e:
            # Ignore files with Python syntax errors
            # print(f"  Ignored: Syntax error in {py_file_path}: {syn_e}")
            pass
        except Exception as parse_e:
            # Ignore other parsing errors
            # print(f"  Ignored: AST parsing error in {py_file_path}: {parse_e}")
            pass
    return units

def build_kb_for_library(source_path, max_kb_size=MAX_KB_SIZE, use_tqdm=USE_TQDM):
    """Builds the KB (list of snippets) by scanning .py files, with progress and error handling."""
    if not os.path.isdir(source_path):
        print(f"[ERROR] The provided source path is not a valid directory: {source_path}")
        return []

    print(f"\nStarting library scan in: {source_path}")
    knowledge_base = []
    file_count = 0
    processed_count = 0
    skipped_count = 0

    # Count total .py files for tqdm (if used)
    total_py_files = 0
    if use_tqdm:
        print("Counting .py files for progress bar...")
        for _, _, files in os.walk(source_path):
            total_py_files += sum(1 for file in files if file.endswith(".py"))
        print(f"Found {total_py_files} .py files.")

    # Set up the iterator (with or without tqdm)
    walker = os.walk(source_path, topdown=True) # topdown=True for potential dir exclusion
    if use_tqdm:
        pbar = tqdm(total=total_py_files, desc="Extracting Snippets", unit="file")

    try:
        for root, dirs, files in walker:
            # Optional: Exclude specific directories (e.g., test, docs, build)
            # dirs[:] = [d for d in dirs if d not in ['tests', 'test', 'docs', '__pycache__', 'build']]

            for file in files:
                if file.endswith(".py"):
                    file_path = os.path.join(root, file)
                    file_count += 1
                    snippets = extract_code_units(file_path)
                    if snippets:
                        knowledge_base.extend(snippets)
                        processed_count += 1
                    else:
                        skipped_count += 1 # .py file read but no snippet extracted (error or empty)

                    if use_tqdm:
                        pbar.update(1)
                    elif file_count % 200 == 0: # Print progress less frequently without tqdm
                        print(f"  Processed {file_count} files...")

    except PermissionError as perm_e:
        print(f"\n[ERROR] Permission error during scan of {source_path}: {perm_e}")
        print("  You may need to adjust permissions or run as a different user.")
    except Exception as walk_e:
        print(f"\n[ERROR] Unexpected error during scan: {walk_e}")
    finally:
        if use_tqdm:
            pbar.close()

    print(f"\nScan completed.")
    print(f"  Total .py files encountered: {file_count}")
    print(f"  .py files processed with snippets: {processed_count}")
    print(f"  .py files skipped/with errors: {skipped_count}")
    print(f"  Total snippets extracted (before sampling): {len(knowledge_base)}")

    # Sampling if the KB is too large
    if len(knowledge_base) > max_kb_size:
        print(f"\nWARNING: KB too large ({len(knowledge_base)} snippets).")
        print(f"  Random sampling to keep a maximum of {max_kb_size} snippets.")
        knowledge_base = random.sample(knowledge_base, max_kb_size)
        print(f"  KB size after sampling: {len(knowledge_base)}")
    elif len(knowledge_base) == 0:
        print("\nWARNING: No snippet extracted from the library.")
        print(f"  Check that '{source_path}' contains valid and readable .py files.")

    return knowledge_base




In [93]:
# --- 3. KB Creation & Persistence ---

print("\n" + "="*40)
print("--- Knowledge Base (KB) Creation ---")
print("="*40)

current_kb = []  # Initialize KB as empty

try:
    # Retrieve info from the loaded dataset
    sample = lca_dataset_split[SAMPLE_INDEX]
    repo_full_name = sample.get('repo_full_name')

    if not repo_full_name:
        print(f"[ERROR] 'repo_full_name' not found in dataset sample {SAMPLE_INDEX}.")
    else:
        print(f"Processing Sample {SAMPLE_INDEX}: Library '{repo_full_name}'")
        print(f"Source code path: {library_source_dir}")

        # Build the KB
        current_kb = build_kb_for_library(library_source_dir)  # Use the improved function

        # Save the KB to Drive if it's not empty
        if current_kb:
            # Create the save directory on Drive if it doesn't exist
            try:
                os.makedirs(DRIVE_KB_SAVE_DIR, exist_ok=True)
            except OSError as drive_err:
                print(f"\n[ERROR] Unable to create save directory on Drive: {DRIVE_KB_SAVE_DIR}")
                print(f"  Error: {drive_err}")
                print("  KB save skipped.")
                # You might choose to exit or continue without saving
                # raise drive_err  # Uncomment to stop execution

            # Build the full path for the KB file
            # Clean the repo name to avoid problematic characters in filenames
            safe_repo_name = repo_full_name.replace('/', '__')  # Replace / with __
            kb_filename = f"kb_{safe_repo_name}_sample_{SAMPLE_INDEX}.json"
            kb_full_path = os.path.join(DRIVE_KB_SAVE_DIR, kb_filename)

            print(f"\nAttempting to save KB ({len(current_kb)} snippets) to: {kb_full_path}")
            try:
                with open(kb_full_path, 'w', encoding='utf-8') as f:
                    json.dump(current_kb, f, indent=2, ensure_ascii=False)
                print(f"[OK] KB successfully saved.")
            except OSError as save_err:
                print(f"\n[ERROR] Unable to write KB file to Drive: {kb_full_path}")
                print(f"  Error: {save_err}. Check write permissions on Drive.")
            except Exception as json_err:
                print(f"\n[ERROR] Error during JSON serialization of the KB: {json_err}")
        else:
            print("\nKB is empty, no file saved.")

except IndexError:
    print(f"[ERROR] Index {SAMPLE_INDEX} out of bounds for 'lca_dataset_split' (size: {len(lca_dataset_split)}).")
except Exception as main_e:
    import traceback
    print(f"\n[ERROR] Unexpected error in main script:")
    print(traceback.format_exc())

# --- 4. Final Check ---
if current_kb:
    print(f"\n--- KB for {repo_full_name} Ready ({len(current_kb)} snippets) ---")
else:
    print(f"\n--- KB not created or empty ---")

print("\n--- End of KB Creation ---")



--- Knowledge Base (KB) Creation ---
Processing Sample 0: Library 'seed-labs__seed-emulator'
Source code path: /content/library_sources/mnt

Starting library scan in: /content/library_sources/mnt
Counting .py files for progress bar...
Found 136 .py files.


Extracting Snippets:   0%|          | 0/136 [00:00<?, ?file/s]


Scan completed.
  Total .py files encountered: 136
  .py files processed with snippets: 99
  .py files skipped/with errors: 37
  Total snippets extracted (before sampling): 1196

Attempting to save KB (1196 snippets) to: /content/drive/MyDrive/RAG_Project/library_kbs/kb_seed-labs__seed-emulator_sample_0.json
[OK] KB successfully saved.

--- KB for seed-labs__seed-emulator Ready (1196 snippets) ---

--- End of KB Creation ---


## Section 6: BM25 Retrieval & Prompt Assembly



In [94]:
# --- 1. Configuration ---
SAMPLE_INDEX = 0      # Index of the sample to process (same as the KB cells)
TOP_K_SNIPPETS = 5    # Number of snippets to retrieve with BM25
BM25_K1 = 1.5         # BM25 parameter (common default, controls TF saturation)
BM25_B = 0.75         # BM25 parameter (common default, controls document length)
DRIVE_KB_SAVE_DIR = '/content/drive/MyDrive/RAG_Project/library_kbs' # KB folder on Drive

# --- 2. Tokenizer Helper ---
def simple_code_tokenizer(text):
    """
    Simple tokenizer optimized for code snippets:
    - lowercase
    - split on spaces and common punctuation (keeping underscores)
    - optionally removes very short tokens
    """
    if not isinstance(text, str):  # Handles non-string input
        return []
    text = text.lower()
    # Replace non-alphanumeric or underscore characters with space
    text = re.sub(r'[^\w\s]', ' ', text)
    # Split on multiple spaces
    tokens = text.split()
    # Optional: remove very short tokens (e.g., length 1), they might be noise
    # tokens = [token for token in tokens if len(token) > 1]
    return tokens

# --- 3. KB Loading ---

print("--- Retrieval with BM25 ---")
kb_data = None  # Initialize KB

# Check required variables from previous cells
if 'lca_dataset_split' not in locals() or not lca_dataset_split:
    raise NameError("CRITICAL ERROR: 'lca_dataset_split' not defined or empty. Re-run the dataset loading cell.")

# Try using KB already in memory ('current_kb' from the previous cell)
# Check that it exists, is a list, and is not empty
if 'current_kb' in locals() and isinstance(current_kb, list) and current_kb:
    print("Using in-memory KB ('current_kb').")
    kb_data = current_kb
else:
    # If current_kb is not valid, try loading from Drive
    print("\n'current_kb' not available or empty in memory.")
    try:
        # Determine KB file name (requires repo_full_name)
        sample = lca_dataset_split[SAMPLE_INDEX]
        repo_full_name_for_kb = sample.get('repo_full_name')
        if not repo_full_name_for_kb:
            print(f"[ERROR] 'repo_full_name' not found in sample {SAMPLE_INDEX} to load KB.")
        else:
            # Clean repo name and build path
            safe_repo_name = repo_full_name_for_kb.replace('/', '__')
            kb_filename = f"kb_{safe_repo_name}_sample_{SAMPLE_INDEX}.json"
            kb_full_path = os.path.join(DRIVE_KB_SAVE_DIR, kb_filename)

            if os.path.exists(kb_full_path):
                print(f"Attempting to load KB from Drive: {kb_full_path}")
                with open(kb_full_path, 'r', encoding='utf-8') as f:
                    kb_data = json.load(f)
                # Additional check: is the loaded file a non-empty list?
                if isinstance(kb_data, list) and kb_data:
                    print(f"KB for '{repo_full_name_for_kb}' loaded from Drive ({len(kb_data)} snippets).")
                else:
                    print(f"[ERROR] KB file loaded from '{kb_full_path}' is not a valid list or is empty.")
                    kb_data = None  # Reset if content is invalid
            else:
                print(f"[ERROR] KB file not found at: {kb_full_path}")

    except IndexError:
        print(f"[ERROR] Invalid index {SAMPLE_INDEX} for 'lca_dataset_split' when retrieving repo name.")
    except FileNotFoundError:  # If DRIVE_KB_SAVE_DIR does not exist
        print(f"[ERROR] KB directory on Drive not found: {DRIVE_KB_SAVE_DIR}")
    except Exception as e:
        print(f"[ERROR] Unexpected error while loading KB from Drive: {e}")
        kb_data = None  # Ensure None in case of error

# If kb_data is still not loaded, exit with a clear error
if not kb_data:
    raise RuntimeError("CRITICAL ERROR: Unable to obtain Knowledge Base (KB) data, neither from memory nor Drive. "
                       "Run Step 2.B cell first to create/save the KB.")

# --- 4. BM25 Indexing & Retrieval ---
retrieved_snippets_bm25 = []  # Initialize results list

try:
    # Extract instruction and repo name (reuse sample if previously loaded)
    if 'sample' not in locals() or sample is None:  # Load sample if not already loaded
        sample = lca_dataset_split[SAMPLE_INDEX]
    instruction = sample.get('instruction')
    repo_full_name = sample.get('repo_full_name', 'N/A')  # Use N/A if missing

    if not instruction:
        print("[ERROR] Instruction (query) not found in the sample.")
    else:
        print(f"\n--- Running BM25 for Sample {SAMPLE_INDEX} (Library: {repo_full_name}) ---")
        print(f"Instruction (Query): {instruction[:250]}...")  # Show a bit more of the query

        # 4.1 Tokenize the Knowledge Base (ensure snippets are strings)
        print("\nTokenizing Knowledge Base...")
        valid_kb_docs = [doc for doc in kb_data if isinstance(doc, str) and doc.strip()]
        if len(valid_kb_docs) < len(kb_data):
            print(f"  Warning: {len(kb_data) - len(valid_kb_docs)} invalid snippets (non-strings/empty) ignored.")

        if not valid_kb_docs:
            print("[ERROR] No valid snippets found in the KB after cleaning.")
        else:
            tokenized_kb = [simple_code_tokenizer(doc) for doc in valid_kb_docs]
            # Remove any empty lists resulting from tokenization
            tokenized_kb_filtered = [tokens for tokens in tokenized_kb if tokens]
            if not tokenized_kb_filtered:
                print("[ERROR] Tokenized KB is empty after removing empty tokens.")
            else:
                original_indices = [i for i, tokens in enumerate(tokenized_kb) if tokens]  # Original indices of valid docs
                print(f"Tokenized KB ({len(tokenized_kb_filtered)} valid documents).")

                # 4.2 Create the BM25 index with configured parameters
                print(f"Creating BM25 index (k1={BM25_K1}, b={BM25_B})...")
                bm25 = BM25Okapi(tokenized_kb_filtered, k1=BM25_K1, b=BM25_B)
                print("BM25 index created.")

                # 4.3 Tokenize the instruction (query)
                print("Tokenizing instruction (query)...")
                tokenized_query = simple_code_tokenizer(instruction)
                if not tokenized_query:
                    print("[ERROR] Tokenized query is empty.")
                else:
                    # 4.4 Perform the retrieval
                    print(f"Retrieving top {TOP_K_SNIPPETS} relevant snippets...")
                    scores = bm25.get_scores(tokenized_query)
                    top_n_filtered_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:TOP_K_SNIPPETS]
                    retrieved_snippets_bm25 = [
                        valid_kb_docs[original_indices[i]] for i in top_n_filtered_indices if i < len(original_indices)
                    ]

                    print(f"\n--- Top {len(retrieved_snippets_bm25)} Snippets Retrieved (BM25) ---")
                    if retrieved_snippets_bm25:
                        for i, snippet in enumerate(retrieved_snippets_bm25):
                            print(f"\n--- Snippet {i+1} (BM25 Rank {i+1}) ---")
                            snippet_preview = textwrap.shorten(
                                snippet.strip(),
                                width=120,
                                placeholder=f" ... (total length: {len(snippet)} characters)"
                            )
                            print(snippet_preview)
                    else:
                        print("No snippets retrieved.")

except IndexError:
    print(f"[ERROR] Invalid index {SAMPLE_INDEX} for 'lca_dataset_split'.")
except Exception as main_e:
    import traceback
    print(f"\n[ERROR] Unexpected error in BM25 main script:")
    print(traceback.format_exc())

if retrieved_snippets_bm25:
    print(f"\n--- [OK] Retrieved {len(retrieved_snippets_bm25)} BM25 snippets ---")
    # The variable 'retrieved_snippets_bm25' contains the list of strings
else:
    print(f"\n--- [WARNING/ERROR] No snippets retrieved from BM25 ---")

--- Retrieval with BM25 ---
Using in-memory KB ('current_kb').

--- Running BM25 for Sample 0 (Library: seed-labs__seed-emulator) ---
Instruction (Query): Generate code that creates an emulation using the seedemu library. The emulation should include three layers: base, routing, and eBGP. It should also include a domain name caching service. 

The base layer should create multiple autonomous systems an...

Tokenizing Knowledge Base...
Tokenized KB (1196 valid documents).
Creating BM25 index (k1=1.5, b=0.75)...
BM25 index created.
Tokenizing instruction (query)...
Retrieving top 5 relevant snippets...

--- Top 5 Snippets Retrieved (BM25) ---

--- Snippet 1 (BM25 Rank 1) ---
def makeStubAs(emu: Emulator, base: Base, asn: int, exchange: int, services: ... (total length: 895 characters)

--- Snippet 2 (BM25 Rank 2) ---
def __init__( self, onAsConflict: Callable[[AutonomousSystem, AutonomousSystem], ... (total length: 1088 characters)

--- Snippet 3 (BM25 Rank 3) ---
def _doInstall(self, no

In [95]:
# --- 5. SIF Prompt Creation ---

# --- Constants and Configurations (Optional but good practice) ---
# Conservative estimate of tokens for the fixed prompt structure
# (You can calculate it more precisely later with your tokenizer)
# Safety margin to avoid hitting the limit exactly
PROMPT_TEMPLATE_BASE_TOKENS = 100
TOKEN_LIMIT_MARGIN = 50

def create_sif_prompt(
    instruction: str,                          # Original instruction
    retrieved_snippets: list[str],            # List of retrieved snippets (from BM25 or similar)
    tokenizer,                                # Loaded Hugging Face tokenizer instance
    max_prompt_tokens: int = 3500,            # Maximum tokens for the entire prompt
    # model_max_length: Optional[int] = None  # Optional: Model max length (if different)
) -> str:
    """
    Creates a SIF (Snippet Integration Format) prompt optimized for an LLM.

    Integrates retrieved snippets as context for code generation based on the given instruction,
    handling tokenization and truncation.

    Args:
        instruction: The user's instruction.
        retrieved_snippets: List of strings containing the retrieved code snippets.
        tokenizer: The initialized Hugging Face tokenizer instance.
        max_prompt_tokens: The approximate maximum tokens allowed for the final prompt.
                           (Considers the LLM's context window minus the tokens for the response).
        # model_max_length: Optional: The model's absolute max length, if known and different
        #                   from tokenizer.model_max_length.

    Returns:
        The formatted prompt string ready to be passed to the LLM.
        Returns an empty string if the instruction is missing.

    Raises:
        TypeError: If tokenizer is not provided or is invalid.
        ValueError: If max_prompt_tokens is not a positive integer.
    """
    # --- Input Validation ---
    if not isinstance(instruction, str) or not instruction.strip():
        warnings.warn("Missing or empty instruction; returning an empty prompt.")
        return ""
    if tokenizer is None or not hasattr(tokenizer, 'encode'):
        raise TypeError("A valid Hugging Face tokenizer is required for create_sif_prompt.")
    if not isinstance(max_prompt_tokens, int) or max_prompt_tokens <= 0:
        raise ValueError("max_prompt_tokens must be a positive integer.")

    # Determine the effective context limit of the model, if available
    effective_model_max_length = getattr(tokenizer, 'model_max_length', None)
    if effective_model_max_length and max_prompt_tokens > effective_model_max_length:
        warnings.warn(
            f"max_prompt_tokens ({max_prompt_tokens}) exceeds the model's maximum length"
            f" ({effective_model_max_length}). The model limit will take precedence"
        )

    # --- Improved Prompt Template ---
    prompt_template = """SYSTEM: You are an expert Python programmer. Generate Python code based ONLY on the user's instruction, using the provided library code snippets for context and correct API usage. Adapt snippets as needed; do not copy them verbatim unless requested.

USER:
### Context: Relevant Code Snippets from Library

{snippets_section}
### Instruction:
{instruction}

ASSISTANT:
```python
"""

    # --- End of Template ---

    # --- Calculating Available Space for Snippets ---
    # Tokenize instruction and base template to know how much space remains
    # Use add_special_tokens=False to count only content tokens
    instruction_tokens = len(tokenizer(instruction, add_special_tokens=False).input_ids)
    template_base_formatted = prompt_template.format(snippets_section="", instruction="")
    template_base_tokens = len(tokenizer(template_base_formatted, add_special_tokens=False).input_ids)

    available_tokens_for_snippets = max(
        0,
        max_prompt_tokens
        - instruction_tokens
        - template_base_tokens
        - TOKEN_LIMIT_MARGIN
    )
    print(f"Token calculation: Total max={max_prompt_tokens}, Instruction={instruction_tokens}, Base template={template_base_tokens}")
    print(f"Available tokens for snippets (approx): {available_tokens_for_snippets}")

    # --- Constructing Snippet Section with Token Checks ---
    snippets_text_parts = []
    accumulated_snippet_tokens = 0
    snippets_included_count = 0

    if not retrieved_snippets:
        warnings.warn("No snippets provided to create_sif_prompt.")

    for i, snippet in enumerate(retrieved_snippets):
        if not isinstance(snippet, str) or not snippet.strip():
            continue

        snippet_header = f"# --- Snippet {i+1} ---\n"
        snippet_content = snippet.strip().strip('`')
        if not snippet_content:
            continue
        snippet_formatted = f"```python\n{snippet_content}\n```\n\n"

        # Estimate tokens for this snippet (header + formatted code)
        current_snippet_section_tokens = len(
            tokenizer(snippet_header + snippet_formatted, add_special_tokens=False).input_ids
        )

        # Check if adding this snippet exceeds available space
        if accumulated_snippet_tokens + current_snippet_section_tokens > available_tokens_for_snippets:
            print(
                f"INFO: Token limit for snippets ({available_tokens_for_snippets}) reached. "
                f"Snippet {i+1} and subsequent ones skipped."
            )
            break

        # Add snippet to the prompt
        snippets_text_parts.append(snippet_header)
        snippets_text_parts.append(snippet_formatted)
        accumulated_snippet_tokens += current_snippet_section_tokens
        snippets_included_count += 1

    # Assemble final snippet section
    if snippets_included_count > 0:
        snippets_section_content = "".join(snippets_text_parts).strip()
    else:
        snippets_section_content = "# (No relevant snippets provided or all exceeded token limit)"

    # --- Composing Final Prompt ---
    final_prompt = prompt_template.format(
        snippets_section=snippets_section_content,
        instruction=instruction
    )

    # --- Final Length Check (Optional but Useful) ---
    final_token_count = len(
        tokenizer(final_prompt, add_special_tokens=False).input_ids
    )
    print(f"\nPrompt SIF created.")
    print(f"  Snippets included: {snippets_included_count} / {len(retrieved_snippets)}")
    print(f"  Estimated length (content only): {final_token_count} tokens (Limit set: {max_prompt_tokens})")

    if effective_model_max_length and final_token_count > effective_model_max_length:
        warnings.warn(
            f"The final prompt ({final_token_count} tokens) EXCEEDS the model's maximum length"
            f" ({effective_model_max_length}). It may be truncated or cause errors."
        )
    elif final_token_count > max_prompt_tokens:
        warnings.warn(
            f"The final prompt ({final_token_count} tokens) EXCEEDS the 'max_prompt_tokens' limit"
            f" ({max_prompt_tokens}). The token estimate may be inaccurate."
        )

    return final_prompt

# --- Example Usage (Modified to use correct variable) ---
print("\n" + "="*40)
print("--- Step 4: Creating RAG Prompt (SIF) ---")
print("="*40)

sif_prompt_final = None

if ('instruction' in locals() and instruction and
    'retrieved_snippets_bm25' in locals() and isinstance(retrieved_snippets_bm25, list) and
    'tokenizer' in locals() and tokenizer):

    prompt_token_limit = 3500
    print(f"Creating SIF prompt with max {prompt_token_limit} tokens...")
    sif_prompt_final = create_sif_prompt(
        instruction=instruction,
        retrieved_snippets=retrieved_snippets_bm25,
        tokenizer=tokenizer,
        max_prompt_tokens=prompt_token_limit
    )

    if sif_prompt_final:
        print("\n--- Preview of Final SIF Prompt (start) ---")
        # Usa textwrap.shorten per la preview
        print(textwrap.shorten(sif_prompt_final, width=1500, placeholder=" [...]\n```python\n")) # show the beginning
    else:
         print("[ERROR] Failed to create the SIF prompt (returned empty).")

else:
    missing_vars = []
    if 'instruction' not in locals() or not instruction: missing_vars.append("'instruction'")
    if 'retrieved_snippets_bm25' not in locals() or not isinstance(retrieved_snippets_bm25, list): missing_vars.append("'retrieved_snippets_bm25' (BM25 list)")
    if 'tokenizer' not in locals() or not tokenizer: missing_vars.append("'tokenizer'")
    print(f"[ERROR] Cannot create SIF prompt. Missing or invalid variables: {', '.join(missing_vars)}.")
    print("         Ensure the previous cells (dataset loading, BM25, tokenizer load) ran correctly.")

print("\n--- End of SIF Prompt Creation ---")


--- Step 4: Creating RAG Prompt (SIF) ---
Creating SIF prompt with max 3500 tokens...
Token calculation: Total max=3500, Instruction=189, Base template=92
Available tokens for snippets (approx): 3169

Prompt SIF created.
  Snippets included: 5 / 5
  Estimated length (content only): 1152 tokens (Limit set: 3500)

--- Preview of Final SIF Prompt (start) ---
SYSTEM: You are an expert Python programmer. Generate Python code based ONLY on the user's instruction, using the provided library code snippets for context and correct API usage. Adapt snippets as needed; do not copy them verbatim unless requested. USER: ### Context: Relevant Code Snippets from Library # --- Snippet 1 --- ```python def makeStubAs(emu: Emulator, base: Base, asn: int, exchange: int, services: List[Service]): """! @brief create a new stub AS. @param emu reference to the Emulator object. @param base reference to the base layer. @param asn ASN for the newly created AS. @param exchange IXP ID for new newly created AS to j

## Section 7 · RAG Code Generation and Output Processing



In [96]:
import torch
from transformers import StoppingCriteria, StoppingCriteriaList, LogitsProcessor, LogitsProcessorList
import warnings
import time

# --- 1. Generation Setup ---
# --- 1.1. Configuration Parameters ---
MAX_NEW_TOKENS = 1024      # Max tokens to generate for the response
TEMPERATURE = 0.6          # Recommended value for R1-Distill (0.5-0.7). Lower = more deterministic
TOP_P = 0.95               # Nucleus sampling (considers only tokens whose cumulative probability > top_p)
TOP_K = 50                 # Top-k sampling (considers only the top k most probable tokens)
REPETITION_PENALTY = 1.1   # Slightly penalize already generated tokens (e.g., 1.1-1.2) to reduce repetition
DO_SAMPLE = True           # Enable sampling (True to use temp/top_p/top_k, False for greedy/deterministic)
STOP_ON_EOS = True         # Stop generation if the EOS token is generated
STOP_ON_CODE_END = True    # Attempt to stop after the end of a code block (e.g. ```)

# --- 1.2. Advanced Stopping Criteria (Optional but Recommended) ---
# Combines EOS stop and, optionally, code block ending

class EosAndCodeStopCriteria(StoppingCriteria):
    def __init__(self, tokenizer, stop_on_eos=True, stop_sequence="\n```\n"):
        self.tokenizer = tokenizer
        self.stop_on_eos = stop_on_eos
        self.stop_sequence = stop_sequence
        self.stop_sequence_ids = self.tokenizer.encode(stop_sequence, add_special_tokens=False)
        # Remove any unwanted leading/trailing tokens from the stop sequence
        # (e.g., if encode adds BOS) - may require tokenizer-specific debugging
        print(f"Stopping sequence: '{self.stop_sequence}' -> IDs: {self.stop_sequence_ids}")
        print(f"Stop on EOS ({self.tokenizer.eos_token_id}): {self.stop_on_eos}")

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        # 1. Check EOS
        if self.stop_on_eos and (input_ids[0, -1] == self.tokenizer.eos_token_id):
            print("Stopping criteria: EOS token detected.")
            return True

        # 2. Check the stop sequence (e.g., \n```\n)
        if self.stop_sequence_ids:
             # Check if the last N tokens match the stop sequence
             len_stop_seq = len(self.stop_sequence_ids)
             if input_ids.shape[1] >= len_stop_seq:
                  last_tokens = input_ids[0, -len_stop_seq:]
                  if torch.equal(last_tokens, torch.tensor(self.stop_sequence_ids).to(last_tokens.device)):
                      print(f"Stopping criteria: Stop sequence '{self.stop_sequence}' detected.")
                      return True
        return False

stopping_criteria_list = None
if STOP_ON_EOS or STOP_ON_CODE_END:
     try:
         custom_stopper = EosAndCodeStopCriteria(
             tokenizer,
             stop_on_eos=STOP_ON_EOS,
             stop_sequence="\n```\n" if STOP_ON_CODE_END else None # Use \n```\n as the code stop sequence
         )
         stopping_criteria_list = StoppingCriteriaList([custom_stopper])
         print("Custom StoppingCriteria created")
     except Exception as e:
          print(f"WARNING: Unable to create custom StoppingCriteria: {e}")

# --- 1.3. (Optional) Forced Decoder IDs to start with <think> ---
# According to R1-Distill recommendations. Basic implementation:
# think_token_sequence = tokenizer.encode("<think>\n", add_special_tokens=False)
# force_think_processor = LogitsProcessorList([
#     ForcedBOSTokenLogitsProcessor(think_token_sequence[0]),  # Force the first token
#     ForcedEOSTokenLogitsProcessor(max_length=MAX_NEW_TOKENS + len(think_token_sequence), eos_token_id=think_token_sequence[1:])  # Force the rest if necessary
# ])
# This part is complex and may require tokenizer-specific adjustments.
# For now we omit it and rely on manually adding it to the prompt if needed.

# --- 2. Input & Generation ---

print("\n" + "="*40)
print("--- Step 5: RAG Code Generation ---")
print("="*40)

generated_code_rag = None # Initialize output

# Check dependencies
if 'sif_prompt_final' in locals() and sif_prompt_final and \
   'model' in locals() and model and \
   'tokenizer' in locals() and tokenizer:

    print(f"SIF prompt received (length: {len(sif_prompt_final)} chars).")
    print("Generation parameters:")
    print(f"  max_new_tokens={MAX_NEW_TOKENS}, temperature={TEMPERATURE if DO_SAMPLE else 'N/A (Greedy)'}")
    print(f"  top_p={TOP_P if DO_SAMPLE else 'N/A'}, top_k={TOP_K if DO_SAMPLE else 'N/A'}")
    print(f"  repetition_penalty={REPETITION_PENALTY}")
    print(f"  do_sample={DO_SAMPLE}")
    print(f"  Stopping Criteria: {'Active' if stopping_criteria_list else 'Inactive'}")

    try:
        # --- Tokenization ---
        print("\nTokenizing SIF prompt...")
        # No need to truncate here if create_sif_prompt already handled limits
        # max_length = tokenizer.model_max_length  # Model maximum length
        inputs = tokenizer(
            sif_prompt_final,
            return_tensors="pt",
            # truncation=True,  # Enable only if strictly necessary
            # max_length=max_length - MAX_NEW_TOKENS  # Leave room for generation
        ).to(model.device)  # Move to GPU

        input_length = inputs['input_ids'].shape[1]
        print(f"Tokenized input length: {input_length} tokens.")

        # --- Generation ---
        print("Starting code generation...")
        start_time = time.time()

        generation_args = {
            "input_ids": inputs['input_ids'],
            "attention_mask": inputs['attention_mask'],
            "max_new_tokens": MAX_NEW_TOKENS,
            "pad_token_id": tokenizer.eos_token_id,
            "repetition_penalty": REPETITION_PENALTY,
            "stopping_criteria": stopping_criteria_list  # Can be None
        }
        if DO_SAMPLE:
            generation_args.update({
                "temperature": TEMPERATURE,
                "top_p": TOP_P,
                "top_k": TOP_K,
                "do_sample": True,
            })
        else:
            # Greedy (deterministic) generation
            generation_args["do_sample"] = False
            # temperature, top_p, top_k are not used

        with torch.no_grad():  # Essential for inference
            # outputs = model.generate(**inputs, ...)  # Alternate way
            outputs = model.generate(**generation_args)

        end_time = time.time()
        print(f"Generation completed in {end_time - start_time:.2f} seconds.")

        # --- Decode and Clean Output ---
        # Decode only the NEW generated tokens
        output_tokens = outputs[0, input_length:]
        generated_code_rag_full = tokenizer.decode(output_tokens, skip_special_tokens=True)

        print("\n--- Generated Code (Raw) ---")
        print(generated_code_rag_full[:500] + "..." if len(generated_code_rag_full) > 500 else generated_code_rag_full)

        # --- Specific Cleanup for Code Blocks ---
        # Look for the content inside the first ```python ... ``` block
        # This is more robust than splitting only on ```
        code_block_match = re.search(r'```python\n(.*?)(?:\n```|\Z)', generated_code_rag_full, re.DOTALL)
        if code_block_match:
            generated_code_rag = code_block_match.group(1).strip()
            print("\nExtracted code from the ```python ... ``` block.")
        else:
            # Fallback: if it does not find ```python, take everything before a closing ```
            # or simply take the whole output if there are no backticks.
            if "\n```" in generated_code_rag_full:  # Look for \n``` to avoid inline matches
                generated_code_rag = generated_code_rag_full.split("\n```")[0].strip()
                print("\n```python block not found, taking output before ```." )
            else:
                generated_code_rag = generated_code_rag_full.strip()
                print("\nNo ``` block found, taking the full output.")

        print("\n--- Generated Code (Clean) ---")
        print(generated_code_rag)

    # --- 3. Error Handling ---
    except torch.cuda.OutOfMemoryError as e:
        print(f"\n[ERROR] Out Of Memory (OOM) during GENERATION!")
        print("  The prompt plus the generated output may exceed VRAM.")
        print("  Try reducing 'max_prompt_tokens' in create_sif_prompt or 'MAX_NEW_TOKENS' here.")
        generated_code_rag = None
    except Exception as e:
        import traceback
        print(f"\n[ERROR] Unexpected error during RAG generation:")
        print(traceback.format_exc())
        generated_code_rag = None

else:
    missing = []
    if 'sif_prompt_final' not in locals() or not sif_prompt_final: missing.append("'sif_prompt_final'")
    if 'model' not in locals() or not model: missing.append("'model'")
    if 'tokenizer' not in locals() or not tokenizer: missing.append("'tokenizer'")
    print(f"[ERROR] Unable to perform generation. Missing or invalid variables: {', '.join(missing)}.")
    print("         Make sure the previous cells have been executed correctly.")
    generated_code_rag = None

# --- 4. Final Verification ---
if generated_code_rag:
    print("\n--- RAG code generation completed ---")
    # The variable 'generated_code_rag' contains the cleaned code
else:
    print("\n--- [ERROR] RAG code generation failed or was not executed ---")


Stopping sequence: '
```
' -> IDs: [185, 10252, 185]
Stop on EOS (32014): True
Custom StoppingCriteria created

--- Step 5: RAG Code Generation ---
SIF prompt received (length: 4339 chars).
Generation parameters:
  max_new_tokens=1024, temperature=0.6
  top_p=0.95, top_k=50
  repetition_penalty=1.1
  do_sample=True
  Stopping Criteria: Active

Tokenizing SIF prompt...
Tokenized input length: 1153 tokens.
Starting code generation...
Generation completed in 63.54 seconds.

--- Generated Code (Raw) ---
import seedemu
from seedemu import *

ASN_BASE = 64512
EXCHANGE_BASE = 9876

EMPTY_DOMAINNAME_SERVICE_ID = 0
REVERSE_DOMAINNAME_SERVICE_ID = 1
IPV4_ORIGIN_SERVICE_ID = 2
DELEGATION_ZONE_SERVICE_ID = 3
DNS_SERVER_PORT = 53
BACKUP_DNS_SERVER_PORT = 53
ROUTER_ADDR = '10.1.1.1/24'
NETWORK_SIZE = 100
MAX_HOSTS = 50

class StubAsException(Exception):
    pass

class AsNotExistsException(StubAsException):
    pass

class IxNotFoundException(StubAsException):
    pass

class HostAlreadyJoinedExcept

## Section 8 · Baseline Generation and RAG Comparison

In [97]:
import torch
import time
import re       # Required for regex cleanup
import textwrap # For prompt preview
import warnings # To handle warnings

print("\n" + "=" * 40)
print("--- Step 6.A: Baseline Generation (LLM-only) ---")
print("=" * 40)
print("NOTE: This cell expects that 'instruction', 'model', 'tokenizer'")
print("      and the generation parameters (MAX_NEW_TOKENS, etc.) have")
print("      been defined in the previous cells (including Step 5).")

generated_code_baseline = None  # Initialize output

# --- 1. Robust Dependency Check ---
# Verify all necessary variables inherited from the previous execution
required_vars = [
    'instruction', 'model', 'tokenizer',
    'MAX_NEW_TOKENS', 'TEMPERATURE', 'TOP_P',
    'TOP_K', 'REPETITION_PENALTY', 'DO_SAMPLE'
]
missing_vars = []
invalid_vars = []

for var_name in required_vars:
    if var_name not in locals():
        missing_vars.append(f"'{var_name}'")
    # Also check that they are not None or empty (where applicable)
    elif var_name in ['instruction', 'model', 'tokenizer'] and not locals()[var_name]:
        invalid_vars.append(f"'{var_name}' (is None or empty)")

# Also verify the stopping criteria (optional, but if it exists it must be used)
# If it doesn't exist from the previous cell, it will be set to None later
stopping_criteria_to_use = locals().get('stopping_criteria_list', None)

# --- 2. Proceed only if all dependencies are OK ---
if not missing_vars and not invalid_vars:

    print("\nAll required variables were found.")

    # --- 3. Baseline Prompt Creation ---
    # Use the same prompt structure for consistency (even if simple)
    baseline_prompt = f"""USER:
### Instruction:
{instruction}

ASSISTANT:
```python
"""
    # Do not print the entire prompt if it is very long
    print("\nBaseline Prompt (start):")
    print(textwrap.shorten(baseline_prompt, width=1200, placeholder="...```python\n"))

    # --- 4. Code Generation ---
    print(f"\nUsing the SAME parameters inherited from the RAG generation:")
    print(f"  max_new_tokens={MAX_NEW_TOKENS}, temperature={TEMPERATURE if DO_SAMPLE else 'N/A (Greedy)'}")
    print(f"  top_p={TOP_P if DO_SAMPLE else 'N/A'}, top_k={TOP_K if DO_SAMPLE else 'N/A'}")
    print(f"  repetition_penalty={REPETITION_PENALTY}")
    print(f"  do_sample={DO_SAMPLE}")
    print(f"  Stopping Criteria: {'Active' if stopping_criteria_to_use else 'Inactive'}")

    try:
        # --- Tokenization ---
        inputs_base = tokenizer(baseline_prompt, return_tensors="pt").to(model.device)
        input_length_base = inputs_base['input_ids'].shape[1]
        print(f"\nTokenized input length: {input_length_base} tokens.")

        # --- model.generate call (Same as RAG except for the input) ---
        print("Starting Baseline generation...")
        start_time = time.time()

        generation_args_base = {
            "input_ids": inputs_base['input_ids'],
            "attention_mask": inputs_base['attention_mask'],
            "max_new_tokens": MAX_NEW_TOKENS,
            "pad_token_id": tokenizer.eos_token_id,
            "repetition_penalty": REPETITION_PENALTY,
            "stopping_criteria": stopping_criteria_to_use  # Use the same one from RAG (can be None)
        }
        if DO_SAMPLE:
            generation_args_base.update({
                "temperature": TEMPERATURE,
                "top_p": TOP_P,
                "top_k": TOP_K,
                "do_sample": True,
            })
        else:
            generation_args_base["do_sample"] = False

        with torch.no_grad():
            outputs_base = model.generate(**generation_args_base)

        end_time = time.time()
        print(f"Baseline generation completed in {end_time - start_time:.2f} seconds.")

        # --- Decode and Clean (Same logic as RAG) ---
        output_tokens_base = outputs_base[0, input_length_base:]
        generated_code_baseline_full = tokenizer.decode(output_tokens_base, skip_special_tokens=True)

        print("\n--- Baseline Generated Code (Raw) ---")
        print(generated_code_baseline_full[:500] + "..." if len(generated_code_baseline_full) > 500 else generated_code_baseline_full)

        # Cleanup with Regex (identical to RAG)
        code_block_match_base = re.search(r'```python\n(.*?)(?:\n```|\Z)', generated_code_baseline_full, re.DOTALL)
        if code_block_match_base:
            generated_code_baseline = code_block_match_base.group(1).strip()
            print("\nExtracted code from the ```python block.")
        else:
            if "\n```" in generated_code_baseline_full:
                generated_code_baseline = generated_code_baseline_full.split("\n```")[0].strip()
                print("\n```python block not found, took output before ```.")
            else:
                generated_code_baseline = generated_code_baseline_full.strip()
                print("\nNo ``` block found, taking the full output.")

        print("\n--- Generated Code (Baseline LLM-only - Clean) ---")
        print(generated_code_baseline or "[Empty generation]")

    except torch.cuda.OutOfMemoryError as e:
        print(f"\n[ERROR] Out Of Memory (OOM) during BASELINE GENERATION!")
        print("  Try reducing 'MAX_NEW_TOKENS'.")
        generated_code_baseline = None  # Ensure None in case of error
    except Exception as e:
        import traceback
        print(f"\n[ERROR] Unexpected error during Baseline generation:")
        print(traceback.format_exc())
        generated_code_baseline = None  # Ensure None in case of error

else:
    # Print detailed error message
    print("\n[ERROR] Unable to perform Baseline generation.")
    error_msg = "         Issue detected with:"
    if missing_vars:
        error_msg += f" Missing variables: {', '.join(missing_vars)}."
    if invalid_vars:
        error_msg += f" Invalid variables (None/empty): {', '.join(invalid_vars)}."
    print(error_msg)
    print("         Make sure ALL previous cells (data/model loading, RAG generation) executed successfully.")

# --- 5. Final Verification ---
if generated_code_baseline is not None:
    print("\n--- Baseline code generation completed ---")
else:
    print("\n--- [ERROR] Baseline code generation failed or was not executed ---")



--- Step 6.A: Baseline Generation (LLM-only) ---
NOTE: This cell expects that 'instruction', 'model', 'tokenizer'
      and the generation parameters (MAX_NEW_TOKENS, etc.) have
      been defined in the previous cells (including Step 5).

All required variables were found.

Baseline Prompt (start):
USER: ### Instruction: Generate code that creates an emulation using the seedemu library. The emulation should include three layers: base, routing, and eBGP. It should also include a domain name caching service. The base layer should create multiple autonomous systems and internet exchanges. Each autonomous system should have multiple hosts and a router. The hosts and the router should join a network within the autonomous system and the router should also join an internet exchange. The domain name caching service should be installed on specific hosts within the autonomous systems and bindings should be added for these installations. The eBGP layer should add private peerings between differ

## Section 9 · Metrics Results


In [98]:
# ================================================================
#  Build baseline_outputs, rag_outputs, references, reference_apis
# ================================================================
from rank_bm25 import BM25Okapi
from tqdm.auto import tqdm
import torch, textwrap

# ----------------- 1. pick how many examples to run -------------
NUM_EXAMPLES = len(lca_dataset_split)    # e.g. 10 for a quick test

# ----------------- 2. BM25 over clean_reference -----------------
corpus_texts  = [ex["clean_reference"] for ex in lca_dataset_split]
bm25          = BM25Okapi([t.split() for t in corpus_texts])

# ----------------- 3. helper: deterministic generation ----------
def generate_code(prompt, max_new_tokens=256):
    inputs = tokenizer(
        prompt,
        return_tensors="pt",
        truncation=True,
        max_length=2048,              # safety for long RAG prompts
    ).to(model.device)

    with torch.no_grad():
        out = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,          # greedy for reproducibility
            temperature=0.0,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id,
        )
    return tokenizer.decode(out[0], skip_special_tokens=True)

# ----------------- 4. main loop --------------------------------
baseline_outputs, rag_outputs = [], []
references, reference_apis    = [], []

for idx, ex in enumerate(tqdm(lca_dataset_split.select(range(NUM_EXAMPLES)),
                              desc="⏳ generating")):
    instr = ex["instruction"]

    # --- baseline ---
    b_prompt = build_baseline_prompt(instr)
    baseline_outputs.append(generate_code(b_prompt))

    # --- retrieve top-k snippets (BM25) ---
    query       = instr.split()
    top_indices = bm25.get_top_n(query, list(range(len(corpus_texts))), n=5)
    retrieved   = "\n\n".join(corpus_texts[i] for i in top_indices)

    # --- RAG ---
    r_prompt = build_rag_prompt(instr, retrieved)
    rag_outputs.append(generate_code(r_prompt))

    # --- store references & APIs as before ---
    references.append(ex["clean_reference"])
    reference_apis.append(ex["unique_apis"])

# ----------------- 5. sanity check ------------------------------
assert len({len(baseline_outputs), len(rag_outputs),
            len(references), len(reference_apis)}) == 1, "length mismatch!"

print(f"\n✅ Built lists for {len(baseline_outputs)} examples.")



⏳ generating:   0%|          | 0/13 [00:00<?, ?it/s]


✅ Built lists for 13 examples.


In [99]:
"""
# ================================================================
#  Metrics helpers – with CodeBLEU support & automatic key-detection
# ================================================================
import importlib, warnings, re
from importlib.metadata import version as _get_version, PackageNotFoundError
import numpy as np

# ─── sacrebleu ─────────────────────────────────────────────────
import sacrebleu
print("✅ sacrebleu", sacrebleu.__version__)

# ─── codebleu ─────────────────────────────────────────────────
try:
    from codebleu import calc_codebleu
    try:
        cb_ver = _get_version("codebleu")
    except PackageNotFoundError:
        cb_ver = "n/a"
    print("✅ codebleu", cb_ver)
    _HAS_CODEBLEU = True
except ImportError:
    print("⚠️  codebleu import failed — CodeBLEU will be skipped.")
    _HAS_CODEBLEU = False

!pip install -q --upgrade tree_sitter tree_sitter_python
!pip install -q git+https://github.com/k4black/codebleu.git
# run this in a fresh cell *before* any CodeBLEU import
!pip install -q --upgrade "tree_sitter<0.23" "tree_sitter_python<0.23"
"""

# ── FIRST cell in the notebook ───────────────────────────
!pip uninstall -yq codebleu            # throw away 0.7.0
!pip install -q --upgrade tree_sitter tree_sitter_python  # stays at 0.24+
!pip install -q git+https://github.com/k4black/codebleu.git  # 0.7.1-dev



  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for codebleu (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [codebleu]
[1A[2K

In [100]:
from codebleu import calc_codebleu
import sacrebleu
import importlib.metadata as md
print("Now running CodeBLEU", md.version("codebleu"))
_HAS_CODEBLEU = True

# ────────────────────────────────────────────────────────────────
#  Composite key auto-detector (runs once on your first example)
# ────────────────────────────────────────────────────────────────
_codebleu_key = None
def _detect_codebleu_key(sample_pred, sample_ref):
    global _codebleu_key
    if not _HAS_CODEBLEU:
        return None
    res = calc_codebleu(
        references=[[sample_ref]],
        predictions=[sample_pred],
        lang="python",
        weights=(0.25,0.25,0.25,0.25)
    )
    print("🔍 CodeBLEU raw result keys:", list(res.keys()))
    # pick the first key containing “codebleu” (case-insensitive)
    for k in res:
        if "codebleu" in k.lower():
            _codebleu_key = k
            break
    return _codebleu_key

# ────────────────────────────────────────────────────────────────
#  Metric functions
# ────────────────────────────────────────────────────────────────
def calculate_chrf(pred, ref):
    if not (isinstance(pred, str) and isinstance(ref, str)):
        return None
    if not pred or not ref:
        return 0.0
    return sacrebleu.corpus_chrf([pred], [[ref]]).score

def calculate_codebleu(pred, ref, lang="python", weights=(0.25,0.25,0.25,0.25)):
    if not _HAS_CODEBLEU:
        return None
    if not (isinstance(pred, str) and isinstance(ref, str)):
        return None

    # detect key on first call
    global _codebleu_key
    if _codebleu_key is None:
        _detect_codebleu_key(pred, ref)
        if _codebleu_key is None:
            warnings.warn("Could not find a CodeBLEU key in the result; returning 0.0")
            return 0.0

    try:
        res = calc_codebleu(
            references=[[ref]],
            predictions=[pred],
            lang=lang,
            weights=weights
        )
        return float(res.get(_codebleu_key, 0.0))
    except Exception as e:
        warnings.warn(f"CodeBLEU failed for one example: {e}")
        return None

def calculate_api_recall(gen, ref_apis):
    if not isinstance(gen, str) or not isinstance(ref_apis, list):
        return 0.0
    valid = [api for api in ref_apis if isinstance(api, str) and api.strip()]
    if not gen or not valid:
        return 0.0
    hits = sum(bool(re.search(rf"\b{re.escape(api)}\b", gen)) for api in valid)
    return hits / len(valid)


# ────────────────────────────────────────────────────────────────
#  Evaluation driver (baseline vs RAG)
# ────────────────────────────────────────────────────────────────
def evaluate(baseline_preds, rag_preds, refs, ref_api_lists):
    assert len(baseline_preds) == len(rag_preds) == len(refs) == len(ref_api_lists), \
        "All four lists must have the same length!"

    def _mean(fn, preds):
        vals = [fn(p, r) for p, r in zip(preds, refs)]
        vals = [v for v in vals if v is not None]
        return np.mean(vals) if vals else 0.0

    recall_b = _mean(calculate_api_recall, baseline_preds)
    recall_r = _mean(calculate_api_recall, rag_preds)
    chrf_b   = _mean(calculate_chrf,           baseline_preds)
    chrf_r   = _mean(calculate_chrf,           rag_preds)
    cbleu_b  = _mean(calculate_codebleu,       baseline_preds)
    cbleu_r  = _mean(calculate_codebleu,       rag_preds)

    print("\n--- Risultati Metriche Automatiche ---")
    print(f"| Metrica    | Baseline |   RAG   |")
    print(f"|------------|----------|---------|")
    print(f"| API Recall | {recall_b:.4f}   | {recall_r:.4f}   |")
    print(f"| ChrF       | {chrf_b:.2f}    | {chrf_r:.2f}    |")
    print(f"| CodeBLEU   | {cbleu_b:.2f}    | {cbleu_r:.2f}    |")
    print("--------------------------------------------------")

    return {
        "recall":   (recall_b,   recall_r),
        "chrf":     (chrf_b,     chrf_r),
        "codebleu": (cbleu_b,    cbleu_r),
    }
metrics = evaluate(baseline_outputs, rag_outputs, references, reference_apis)


Now running CodeBLEU 0.7.1
🔍 CodeBLEU raw result keys: ['codebleu', 'ngram_match_score', 'weighted_ngram_match_score', 'syntax_match_score', 'dataflow_match_score']

--- Risultati Metriche Automatiche ---
| Metrica    | Baseline |   RAG   |
|------------|----------|---------|
| API Recall | 0.0000   | 0.0000   |
| ChrF       | 23.09    | 53.15    |
| CodeBLEU   | 0.16    | 0.43    |
--------------------------------------------------


### Prompt 1
| Metrica    | Baseline |   RAG   |
|------------|----------|---------|
| API Recall | 0.0000   | 0.0000   |
| ChrF       | 17.05    | 53.43    |
| CodeBLEU   | 0.11    | 0.43    |



---



### Prompt 2
| Metrica    | Baseline |   RAG   |
|------------|----------|---------|
| API Recall | 0.0000   | 0.0000   |
| ChrF       | 25.75    | 53.43    |
| CodeBLEU   | 0.21    | 0.43    |

---

### Prompt 3
| Metrica    | Baseline |   RAG   |
|------------|----------|---------|
| API Recall | 0.0000   | 0.0000   |
| ChrF       | 23.09    | 53.15    |
| CodeBLEU   | 0.16    | 0.43    |


In [101]:
def count_valid(fn, preds, refs):
    """Count how many (pred, ref) pairs return a non‐None metric."""
    return sum(1 for p, r in zip(preds, refs) if fn(p, r) is not None)

# For API‐Recall we never return None, so it’s simply the full length:
n_api = len(baseline_outputs)

# For ChrF & CodeBLEU we drop any None’s:
n_chrf     = count_valid(calculate_chrf,     baseline_outputs, references)
n_codebleu = count_valid(calculate_codebleu, baseline_outputs, references)

print(f"API Recall was computed on {n_api} samples")
print(f"ChrF       was computed on {n_chrf} samples")
print(f"CodeBLEU   was computed on {n_codebleu} samples")


API Recall was computed on 13 samples
ChrF       was computed on 13 samples
CodeBLEU   was computed on 13 samples


In [102]:
n_chrf_rag     = count_valid(calculate_chrf,     rag_outputs, references)
n_codebleu_rag = count_valid(calculate_codebleu, rag_outputs, references)
print(f"(RAG) ChrF       on {n_chrf_rag} samples")
print(f"(RAG) CodeBLEU   on {n_codebleu_rag} samples")


(RAG) ChrF       on 13 samples
(RAG) CodeBLEU   on 13 samples


## Section 10 · Example result


In [103]:
# ──────────────────────────────────────────────────────────────────
# Display a sample: task + reference + baseline + RAG side by side
# ──────────────────────────────────────────────────────────────────
from IPython.display import display, Markdown
import textwrap

EXAMPLE_INDEX = 0  # Change this to any index within your dataset size

task       = references[EXAMPLE_INDEX]            # Gold reference code (cleaned)
baseline   = baseline_outputs[EXAMPLE_INDEX]      # Generated from instruction only
rag        = rag_outputs[EXAMPLE_INDEX]           # Generated with RAG prompt
instruction = lca_dataset_split[EXAMPLE_INDEX]['instruction']  # Original task (English)

def print_block(title, content):
    print(f"{title}")
    print("-" * 10)
    print(textwrap.dedent(content).strip())
    print("-" * 10)
    print()




In [104]:
print("Instruction")
print("=" * 40)
print(textwrap.dedent(instruction).strip())
print("=" * 40)

Instruction
Generate code that creates an emulation using the seedemu library. The emulation should include three layers: base, routing, and eBGP. It should also include a domain name caching service. 

The base layer should create multiple autonomous systems and internet exchanges. Each autonomous system should have multiple hosts and a router. The hosts and the router should join a network within the autonomous system and the router should also join an internet exchange. 

The domain name caching service should be installed on specific hosts within the autonomous systems and bindings should be added for these installations. 

The eBGP layer should add private peerings between different autonomous systems. 

Finally, all the layers and the domain name caching service should be added to the emulator and the state of the emulator should be dumped to a binary file.


In [105]:
print("✅ Gold Reference")
print("=" * 40)
print(textwrap.dedent(task).strip())
print("=" * 40)

✅ Gold Reference
from seedemu.layers import Base, Routing, Ebgp, PeerRelationship, Ibgp, Ospf
from seedemu.compiler import Docker
from seedemu.services import DomainNameCachingService
from seedemu.core import Emulator, Binding, Filter, Node
from typing import List

sim = Emulator()

base = Base()
routing = Routing()
ebgp = Ebgp()
ibgp = Ibgp()
ospf = Ospf()
ldns = DomainNameCachingService()

def make_stub_as(asn: int, exchange: str):
    stub_as = base.createAutonomousSystem(asn)
    host = stub_as.createHost('host0')
    host1 = stub_as.createHost('host1')
    host2 = stub_as.createHost('host2')
    host3 = stub_as.createHost('host3')
    host4 = stub_as.createHost('host4')
    host5 = stub_as.createHost('host5')
    ldns_host = stub_as.createHost('ldns') 

    router = stub_as.createRouter('router0')
    net = stub_as.createNetwork('net0')

    router.joinNetwork('net0')
    host.joinNetwork('net0')
    host1.joinNetwork('net0')
    host2.joinNetwork('net0')
    host3.joinNetwork('ne

In [106]:
print("Baseline Output")
print("=" * 40)
print(textwrap.dedent(baseline).strip())
print("=" * 40)

Baseline Output
You are a senior Python engineer.  Fulfill the following task by writing production-ready code.

**Task**:
Generate code that creates an emulation using the seedemu library. The emulation should include three layers: base, routing, and eBGP. It should also include a domain name caching service. 

The base layer should create multiple autonomous systems and internet exchanges. Each autonomous system should have multiple hosts and a router. The hosts and the router should join a network within the autonomous system and the router should also join an internet exchange. 

The domain name caching service should be installed on specific hosts within the autonomous systems and bindings should be added for these installations. 

The eBGP layer should add private peerings between different autonomous systems. 

Finally, all the layers and the domain name caching service should be added to the emulator and the state of the emulator should be dumped to a binary file.

**Requiremen

In [107]:
print("RAG Output")
print("=" * 40)
print(textwrap.dedent(rag).strip())
print("=" * 40)


RAG Output
You are a senior Python engineer.  Use the retrieved examples to guide your implementation.

**Retrieved Examples**:
from seedemu import *

hosts_per_stub_as = 3
emu = Makers.makeEmulatorBaseWith10StubASAndHosts(hosts_per_stub_as = hosts_per_stub_as)

eth = EthereumService()

blockchain = eth.createBlockchain(chainName="pos", consensus=ConsensusMechanism.POS)

blockchain.setTerminalTotalDifficulty(30)

asns = [150, 151, 152, 153, 154, 160, 161, 162, 163, 164]

i = 1
for asn in asns:
    for id in range(hosts_per_stub_as):        

        e:EthereumServer = blockchain.createNode("eth{}".format(i))   

        e.appendClassName('Ethereum-POS-{}'.format(i))

        e.enableGethHttp()

        if asn == 150 and id == 0:
                e.setBeaconSetupNode()

        if asn == 150 and id == 1:
                e.setBootNode(True)

        if asn in [151]:
            if id == 0:
                e.enablePOSValidatorAtRunning()
            if id == 1:
                e.enablePOSV