### Test de compatibilidad

In [1]:
import torch
import transformers
import datasets
import peft
import trl
import bitsandbytes
import accelerate
import os

# Suprimir una advertencia común de bitsandbytes en Windows
os.environ["BITSANDBYTES_NOWELCOME"] = "1"

print("--- Verificación de Versiones de Librerías ---")
print(f"PyTorch:         {torch.__version__}")
print(f"CUDA (PyTorch):  {torch.version.cuda}")
print(f"Transformers:    {transformers.__version__}")
print(f"Datasets:        {datasets.__version__}")
print(f"PEFT:            {peft.__version__}")
print(f"TRL:             {trl.__version__}")
print(f"BitsAndBytes:    {bitsandbytes.__version__}")
print(f"Accelerate:      {accelerate.__version__}")
print("-----------------------------------------------")

# Verificación final de GPU (redundante pero no hace daño)
print(f"\nCUDA disponible para PyTorch: {torch.cuda.is_available()}")

--- Verificación de Versiones de Librerías ---
PyTorch:         2.6.0+cu124
CUDA (PyTorch):  12.4
Transformers:    4.56.2
Datasets:        4.1.1
PEFT:            0.17.1
TRL:             0.23.0
BitsAndBytes:    0.47.0
Accelerate:      1.10.1
-----------------------------------------------

CUDA disponible para PyTorch: True


## Inicio Sesion HF

In [2]:
from huggingface_hub import login

# Pega tu token de Hugging Face aquí
# Es un token de "lectura", así que es seguro
TOKEN = "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"

login(token=TOKEN)

print("¡Sesión iniciada en Hugging Face!")

¡Sesión iniciada en Hugging Face!


## Cargar modelo Mistral

In [3]:
import torch
import transformers
import os
from datasets import load_dataset, concatenate_datasets
from transformers import (
    AutoTokenizer, 
    AutoModelForCausalLM, 
    BitsAndBytesConfig, 
    TrainingArguments
)
from peft import LoraConfig, get_peft_model
from trl import SFTTrainer

# Suprimir la advertencia de bitsandbytes
os.environ["BITSANDBYTES_NOWELCOME"] = "1"

print("Librerías importadas con éxito.")

# --- 1. Definir el Modelo y el Tokenizador (¡CAMBIADO!) ---
MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.3"
NUEVO_MODELO_NOMBRE = "Mistral-7B-Python-Expert-v1"

print(f"Cargando el modelo base: {MODEL_ID}")

# --- 2. Configuración de Cuantización (4-bit) ---
# Esto es lo que hace que el modelo quepa en tus 12GB de VRAM
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

# --- 3. Cargar el Tokenizador ---
# No se necesita token. ¡Descarga directa!
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
# Mistral, al igual que Llama, no tiene un pad_token.
tokenizer.pad_token = tokenizer.eos_token

print("Tokenizador cargado.")

# --- 4. Cargar el Modelo Base con Cuantización ---
# No se necesita token. ¡Descarga directa!
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto" # "auto" le dice a transformers que ponga el modelo en la GPU
)

print(f"Modelo {MODEL_ID} cargado en 4-bits.")

# --- 5. Configurar LoRA (PEFT) ---
# Las capas de Mistral 7B son las mismas que las de Llama 3, así que esto no cambia.
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

# --- 6. Aplicar LoRA al Modelo ---
model = get_peft_model(model, lora_config)

print("Configuración de LoRA (PEFT) aplicada al modelo.")
model.print_trainable_parameters()

Librerías importadas con éxito.
Cargando el modelo base: mistralai/Mistral-7B-Instruct-v0.3
Tokenizador cargado.


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Modelo mistralai/Mistral-7B-Instruct-v0.3 cargado en 4-bits.
Configuración de LoRA (PEFT) aplicada al modelo.
trainable params: 13,631,488 || all params: 7,261,655,040 || trainable%: 0.1877


## Celda 2: Cargar y Procesar los Datasets

In [4]:
from datasets import load_dataset, concatenate_datasets
import re

print("Iniciando procesamiento (Solo Datasets de Código)")

# --- 1. Definir un filtro de Python simple y de alta precisión ---
PYTHON_KEYWORDS_SIMPLE = set(["python", "pandas", "numpy", "def ", "sklearn", "torch", "tensorflow","sql"])

def es_python_simple(example):
    # Combinamos instruction y output
    texto_completo = (example['instruction'] + example['output']).lower()
    return any(keyword in texto_completo for keyword in PYTHON_KEYWORDS_SIMPLE)

# --- 2. Cargar y Filtrar CodeAlpaca ---
print("Cargando CodeAlpaca...")
ds_alpaca = load_dataset("sahil2801/CodeAlpaca-20k", split="train")
ds_alpaca_python = ds_alpaca.filter(es_python_simple)
print(f"CodeAlpaca: {len(ds_alpaca)} originales -> {len(ds_alpaca_python)} de Python")

# --- 3. Cargar y Filtrar CodeInstructions122k ---
print("Cargando CodeInstructions122k...")
ds_code_122k = load_dataset("TokenBender/code_instructions_122k_alpaca_style", split="train")
ds_code_122k_python = ds_code_122k.filter(es_python_simple)
print(f"CodeInstructions122k: {len(ds_code_122k)} originales -> {len(ds_code_122k_python)} de Python")

# --- 4. Formatear para SFT (Formato Mistral) ---
# Ambos datasets usan el formato 'instruction', 'input', 'output' de Alpaca
def format_prompt_mistral_alpaca(example):
    instruccion = example['instruction']
    input_opcional = example.get('input') # .get() no da error si 'input' no existe
    respuesta = example['output']

    # Si hay un 'input' (contexto adicional), lo unimos a la instrucción
    if input_opcional:
        instruccion = instruccion + "\n\nInput:\n" + input_opcional
    
    # Descartar ejemplos sin respuesta
    if not respuesta:
        return None

    example['text'] = f"[INST] {instruccion} [/INST] {respuesta}</s>"
    return example

print("Aplicando formato Mistral a los datasets...")
# Usamos 'remove_columns' para limpiar los datasets antes de concatenar
ds_alpaca_formatted = ds_alpaca_python.map(format_prompt_mistral_alpaca, remove_columns=['instruction', 'input', 'output'])

# --- ¡LÍNEA CORREGIDA ABAJO! ---
ds_code_122k_formatted = ds_code_122k_python.map(format_prompt_mistral_alpaca, remove_columns=['instruction', 'input', 'output']) # <-- ¡CORREGIDO!

# Filtrar cualquier ejemplo que haya devuelto None
ds_alpaca_formatted = ds_alpaca_formatted.filter(lambda x: x['text'] is not None)
ds_code_122k_formatted = ds_code_122k_formatted.filter(lambda x: x['text'] is not None)

# --- 5. Combinar y Mezclar ---
dataset_final = concatenate_datasets([ds_alpaca_formatted, ds_code_122k_formatted])
dataset_final = dataset_final.shuffle(seed=42)

print("\n--- ¡Procesamiento completado! ---")
print(f"Tamaño total del dataset final: {len(dataset_final)}")

# Imprimir un ejemplo para verificar el formato
print("\n--- Ejemplo de formato (índice 0): ---")
print(dataset_final[0]['text'])
print("\n--- Ejemplo de formato (índice 500): ---")
print(dataset_final[500]['text'])

Iniciando procesamiento (Solo Datasets de Código)
Cargando CodeAlpaca...
CodeAlpaca: 20022 originales -> 8578 de Python
Cargando CodeInstructions122k...
CodeInstructions122k: 121959 originales -> 63841 de Python
Aplicando formato Mistral a los datasets...

--- ¡Procesamiento completado! ---
Tamaño total del dataset final: 72418

--- Ejemplo de formato (índice 0): ---
[INST] Create a Python function that takes a dictionary as an argument, and returns the value with maximum frequency in that dictionary.

Input:
dic = {1: 'a', 2: 'b', 3: 'b', 4: 'c'} [/INST] def max_freq_val(dic): 
    max_freq = 0
    max_val = None
    for val in dic.values():
        if dic.values().count(val) > max_freq:
            max_freq = dic.values().count(val)
            max_val = val
    
    return max_val

max_val = max_freq_val(dic)
print(max_val)</s>

--- Ejemplo de formato (índice 500): ---
[INST] Create an algorithm to merge two sorted linked lists into one sorted list. [/INST] def merge_sorted_lists(ls

## Celda 3: Entrenamiento


In [5]:
# ============================================
# ENTRENAMIENTO SFT ROBUSTO (TRL 0.23 + PEFT)
# Auto-fallback para eval_strategy / evaluation_strategy
# ============================================

import os, re, math, torch, inspect
from datasets import DatasetDict
from trl import SFTTrainer, SFTConfig

# ---------- 1) Split train/eval ----------
splits = dataset_final.train_test_split(test_size=0.05, seed=42)  # 5% eval
train_ds = splits["train"]
eval_ds  = splits["test"]
print(f"[Split] Train: {len(train_ds)} | Eval: {len(eval_ds)}")

# ---------- 2) Precisión coherente con tu GPU ----------
use_bf16 = torch.cuda.is_available() and torch.cuda.is_bf16_supported()
precision_kwargs = dict(bf16=True, fp16=False) if use_bf16 else dict(bf16=False, fp16=True)
print(f"[Precision] bf16={precision_kwargs.get('bf16')} fp16={precision_kwargs.get('fp16')}")

# ---------- 3) Hiperparámetros ajustables ----------
PER_DEVICE_BS = 2
GRAD_ACCUM    = 4
PACKING       = False
MAX_LEN       = 1024
LOG_STEPS     = 50
SAVE_STEPS    = 1000
EVAL_STEPS    = SAVE_STEPS
SAVE_LIMIT    = 2

# ---------- 4) Detectar nombre correcto del parámetro de evaluación ----------
sft_params = set(inspect.signature(SFTConfig.__init__).parameters.keys())
EVAL_KEY = "evaluation_strategy" if "evaluation_strategy" in sft_params else (
           "eval_strategy"        if "eval_strategy"        in sft_params else None)

if EVAL_KEY is None:
    print("[Aviso] Ni 'evaluation_strategy' ni 'eval_strategy' están en SFTConfig; se hará evaluación solo al final.")
else:
    print(f"[Eval key] Usando '{EVAL_KEY}' para estrategia de evaluación.")

# ---------- 5) Construir kwargs comunes de SFTConfig ----------
base_kwargs = dict(
    output_dir=NUEVO_MODELO_NOMBRE,
    per_device_train_batch_size=PER_DEVICE_BS,
    gradient_accumulation_steps=GRAD_ACCUM,
    learning_rate=2e-4,
    num_train_epochs=1,
    logging_steps=LOG_STEPS,
    save_steps=SAVE_STEPS,
    save_total_limit=SAVE_LIMIT,
    dataset_text_field="text",
    max_length=MAX_LEN,
    packing=PACKING,
    gradient_checkpointing=True,
    report_to="none",
    pad_token=tokenizer.pad_token,
    eos_token=tokenizer.eos_token,
    optim="paged_adamw_8bit",
    lr_scheduler_type="cosine",
    warmup_ratio=0.03,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    seed=42,
    **precision_kwargs
)

# Añadir evaluación por pasos usando la clave correcta (si existe)
if EVAL_KEY:
    base_kwargs[EVAL_KEY] = "steps"
    base_kwargs["eval_steps"] = EVAL_STEPS

# ---------- 6) Instanciar SFTConfig ----------
sft_config = SFTConfig(**base_kwargs)

# ---------- 7) Crear SFTTrainer (modelo ya envuelto con LoRA con get_peft_model) ----------
trainer = SFTTrainer(
    model=model,
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    args=sft_config,
    # tokenizer=tokenizer,  # descomenta solo si tu TRL lo acepta (si no, déjalo así)
)

# ---------- 8) Reanudar desde último checkpoint si existe ----------
last_ckpt = None
if os.path.isdir(NUEVO_MODELO_NOMBRE):
    cks = [d for d in os.listdir(NUEVO_MODELO_NOMBRE) if d.startswith("checkpoint-")]
    if cks:
        steps = []
        for c in cks:
            m = re.findall(r"checkpoint-(\d+)", c)
            if m:
                steps.append(int(m[0]))
        if steps:
            last_ckpt = os.path.join(NUEVO_MODELO_NOMBRE, f"checkpoint-{max(steps)}")
            print(f"[Resume] Reanudando desde: {last_ckpt}")

# ---------- 9) Entrenar ----------
print("[Train] ¡Iniciando entrenamiento!")
trainer.train(resume_from_checkpoint=last_ckpt)

# ---------- 10) Evaluación final + Perplexity ----------
print("[Eval] Evaluación final...")
metrics = trainer.evaluate()
for k, v in metrics.items():
    if isinstance(v, float):
        print(f"  {k}: {v:.6f}")
    else:
        print(f"  {k}: {v}")

if "eval_loss" in metrics and isinstance(metrics["eval_loss"], float):
    try:
        ppl = math.exp(metrics["eval_loss"])
        print(f"  Perplexity: {ppl:.2f}")
    except OverflowError:
        pass

# ---------- 11) Guardar adaptadores finales LoRA ----------
final_dir = f"{NUEVO_MODELO_NOMBRE}-final"
trainer.save_model(final_dir)
print(f"[Save] Adaptadores LoRA guardados en: {final_dir}")


[Split] Train: 68797 | Eval: 3621
[Precision] bf16=True fp16=False
[Eval key] Usando 'eval_strategy' para estrategia de evaluación.
[Resume] Reanudando desde: Mistral-7B-Python-Expert-v1\checkpoint-7000
[Train] ¡Iniciando entrenamiento!


  return fn(*args, **kwargs)


Step,Training Loss,Validation Loss,Entropy,Num Tokens,Mean Token Accuracy
8000,1.7986,1.751899,0.495269,1301355.0,0.747835


  return fn(*args, **kwargs)


[Eval] Evaluación final...


  eval_loss: 1.751899
  eval_runtime: 1181.960000
  eval_samples_per_second: 3.064000
  eval_steps_per_second: 0.383000
  eval_entropy: 0.495269
  eval_num_tokens: 2077547.000000
  eval_mean_token_accuracy: 0.747835
  epoch: 0.999913
  Perplexity: 5.77
[Save] Adaptadores LoRA guardados en: Mistral-7B-Python-Expert-v1-final


## Comparación Cualitativa

In [5]:
import torch
import gc # Garbage Collector, para limpiar la memoria
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel

# --- 1. Definir Constantes ---
MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.3"
# ¡IMPORTANTE! Asegúrate de que este sea el nombre de la carpeta que se guardó
ADAPTER_PATH = "./Mistral-7B-Python-Expert-v1-final" 

# --- 2. Configuración de Carga (misma que antes) ---
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

# --- 3. Definir el Prompt de Prueba ---
# ¡Puedes cambiar esta pregunta por la que tú quieras!
pregunta = "Escribe una función en Python que use pandas para cargar un archivo CSV llamado 'usuarios.csv' y devuelva el promedio de la columna 'edad'."

prompt_formateado = f"[INST] {pregunta} [/INST]"

# =========================================================
# 
#              PRUEBA 1: MODELO BASE
# 
# =========================================================
print("--- 1. Probando el Modelo Base (Original) ---")

# Cargar modelo base
model_base = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto"
)

# Cargar tokenizador
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
tokenizer.pad_token = tokenizer.eos_token

# Generar respuesta
inputs = tokenizer(prompt_formateado, return_tensors="pt").to("cuda")
outputs = model_base.generate(
    **inputs, 
    max_new_tokens=256, 
    do_sample=True, 
    temperature=0.7, 
    top_p=0.9
)

respuesta_base = tokenizer.decode(outputs[0], skip_special_tokens=True)
print("RESPUESTA (Base):\n", respuesta_base)

# --- Limpiar VRAM ---
del model_base
gc.collect()
torch.cuda.empty_cache()
print("\n--- Modelo Base limpiado de la VRAM ---")


# =========================================================
# 
#             PRUEBA 2: MODELO EXPERTO (AFINADO)
# 
# =========================================================
print("\n--- 2. Probando el Modelo Experto (Afinado) ---")

# Volver a cargar el modelo base (necesario para aplicar LoRA)
model_para_lora = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto"
)

# ¡Cargar los adaptadores LoRA encima!
model_experto = PeftModel.from_pretrained(model_para_lora, ADAPTER_PATH)
model_experto.eval() # Poner en modo de evaluación

print("Adaptadores LoRA cargados.")

# Generar respuesta (reusamos el tokenizer y el prompt)
inputs = tokenizer(prompt_formateado, return_tensors="pt").to("cuda")
outputs_experto = model_experto.generate(
    **inputs, 
    max_new_tokens=256, 
    do_sample=True, 
    temperature=0.7, 
    top_p=0.9
)

respuesta_experto = tokenizer.decode(outputs_experto[0], skip_special_tokens=True)
print("RESPUESTA (Experto):\n", respuesta_experto)

# --- Limpiar VRAM al final ---
del model_para_lora
del model_experto
gc.collect()
torch.cuda.empty_cache()

--- 1. Probando el Modelo Base (Original) ---


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


RESPUESTA (Base):
 Escribe una función en Python que use pandas para cargar un archivo CSV llamado 'usuarios.csv' y devuelva el promedio de la columna 'edad'.  Para crear una función en Python que utilice la biblioteca pandas para cargar un archivo CSV llamado 'usuarios.csv' y devuelva el promedio de la columna 'edad', sigue este ejemplo:

```python
import pandas as pd

def promedio_edad(archivo):
    # Cargar el archivo CSV
    df = pd.read_csv(archivo)

    # Calcular el promedio de la columna 'edad'
    promedio = df['edad'].mean()

    return promedio

# Uso de la función
archivo = 'usuarios.csv'
promedio = promedio_edad(archivo)
print(f'El promedio de edad es: {promedio}')
```

Por último, asegúrate de tener el archivo CSV 'usuarios.csv' en la carpeta donde se ejecuta el código o proporciona la ruta completa del archivo

--- Modelo Base limpiado de la VRAM ---

--- 2. Probando el Modelo Experto (Afinado) ---


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


Adaptadores LoRA cargados.
RESPUESTA (Experto):
 Escribe una función en Python que use pandas para cargar un archivo CSV llamado 'usuarios.csv' y devuelva el promedio de la columna 'edad'.  Para crear una función en Python que utilice la biblioteca pandas para cargar un archivo CSV y devolver el promedio de la columna 'edad', sigue este ejemplo:

```python
import pandas as pd

def promedio_edad(archivo_csv):
    try:
        data = pd.read_csv(archivo_csv)
        edad = data['edad']
        promedio = edad.mean()
        return promedio
    except FileNotFoundError:
        print("El archivo no se encuentra en la ruta especificada.")
        return None

# Usa la función para cargar el archivo 'usuarios.csv'
archivo = 'usuarios.csv'
promedio = promedio_edad(archivo)
if promedio is not None:
    print(f"El promedio de la columna 'edad' es: {promedio}")
```

En este ejemplo, la función `promedio_edad(archivo_


## 1. Evaluación Cuantitativa (Benchmark HumanEval offline) 📊

In [None]:
import json, os, re, tempfile, subprocess, random, time, csv, uuid, textwrap
from pathlib import Path
import torch
import gc
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel
from tqdm import tqdm

# --- Config ---
JSONL_PATH = "./humaneval_local_25.jsonl" # Lo guardará en el directorio actual
RESULTS_CSV_BASE = "./humaneval_local_results_BASE.csv"
RESULTS_CSV_EXPERTO = "./humaneval_local_results_EXPERTO.csv"
GEN_KW = dict(max_new_tokens=384, temperature=0.1, top_p=0.95, do_sample=True, pad_token_id=2) # 2 es eos_token_id para Mistral


TASKS = [
    ("add", "def add(a, b):\n    \"\"\"Return a + b\"\"\"\n", "import pytest\n\ndef test_add():\n    assert add(1,2)==3\n    assert add(-1,1)==0\n"),
    ("is_even", "def is_even(n:int)->bool:\n    \"\"\"Return True if n is even\"\"\"\n", "import pytest\n\ndef test_e():\n    assert is_even(2)\n    assert not is_even(3)\n"),
    ("fib", "def fib(n:int):\n    \"\"\"Return a list with the first n Fibonacci numbers starting at 0,1\"\"\"\n", "import pytest\n\ndef test_f():\n    assert fib(1)==[0]\n    assert fib(2)==[0,1]\n    assert fib(7)==[0,1,1,2,3,5,8]\n"),
    ("is_prime", "def is_prime(n:int)->bool:\n    \"\"\"Return True if n is prime\"\"\"\n", "import pytest\n\ndef test_p():\n    assert is_prime(2) and is_prime(3) and is_prime(97)\n    assert not is_prime(1)\n    assert not is_prime(100)\n"),
    ("flatten", "def flatten(lst:list)->list:\n    \"\"\"Flatten a shallow list of lists\"\"\"\n", "import pytest\n\ndef test_flat():\n    assert flatten([[1,2],[3],[]])==[1,2,3]\n    assert flatten([])==[]\n"),
    ("reverse_words", "def reverse_words(s:str)->str:\n    \"\"\"Reverse word order by spaces\"\"\"\n", "import pytest\n\ndef test_rev():\n    assert reverse_words('hola mundo')=='mundo hola'\n    assert reverse_words('a b c')=='c b a'\n"),
    ("count_vowels", "def count_vowels(s:str)->int:\n    \"\"\"Count lowercase vowels aeiou\"\"\"\n", "import pytest\n\ndef test_v():\n    assert count_vowels('hola')==2\n    assert count_vowels('xyz')==0\n"),
    ("unique_sorted", "def unique_sorted(lst:list)->list:\n    \"\"\"Return sorted unique elements\"\"\"\n", "import pytest\n\ndef test_u():\n    assert unique_sorted([3,1,2,3,2,1])==[1,2,3]\n"),
    ("sum_digits", "def sum_digits(n:int)->int:\n    \"\"\"Sum digits of a non-negative integer\"\"\"\n", "import pytest\n\ndef test_s():\n    assert sum_digits(0)==0\n    assert sum_digits(12345)==15\n"),
    ("is_palindrome", "def is_palindrome(s:str)->bool:\n    \"\"\"Check palindrome ignoring spaces and case\"\"\"\n", "import pytest\n\ndef test_pal():\n    assert is_palindrome('Anita lava la tina')\n    assert not is_palindrome('python')\n"),
    ("fact", "def fact(n:int)->int:\n    \"\"\"Return n! for n>=0\"\"\"\n", "import pytest\n\ndef test_fact():\n    assert fact(0)==1\n    assert fact(5)==120\n"),
    ("merge_dicts", "def merge_dicts(a:dict,b:dict)->dict:\n    \"\"\"Merge summing values for common keys\"\"\"\n", "import pytest\n\ndef test_m():\n    assert merge_dicts({'a':1,'b':2},{'b':3,'c':4})=={'a':1,'b':5,'c':4}\n"),
    ("gcd", "def gcd(a:int,b:int)->int:\n    \"\"\"Greatest common divisor\"\"\"\n", "import pytest\n\ndef test_gcd():\n    assert gcd(12,18)==6\n    assert gcd(7,13)==1\n"),
    ("two_sum", "def two_sum(nums:list, target:int)->tuple:\n    \"\"\"Return indices (i,j) with nums[i]+nums[j]==target (i<j) or (-1,-1)\"\"\"\n", "import pytest\n\ndef test_ts():\n    assert two_sum([2,7,11,15],9)==(0,1)\n    assert two_sum([3,2,4],6)==(1,2)\n"),
    ("anagrams", "def anagrams(a:str,b:str)->bool:\n    \"\"\"Check if two strings are anagrams (ignore spaces and case)\"\"\"\n", "import pytest\n\ndef test_a():\n    assert anagrams('Listen','Silent')\n    assert not anagrams('apple','pale')\n"),
    ("balanced_parens", "def balanced_parens(s:str)->bool:\n    \"\"\"Return True if parentheses are balanced\"\"\"\n", "import pytest\n\ndef test_bp():\n    assert balanced_parens('(())')\n    assert not balanced_parens('(()')\n"),
    ("roman_to_int", "def roman_to_int(s:str)->int:\n    \"\"\"Convert Roman numeral to int\"\"\"\n", "import pytest\n\ndef test_r():\n    assert roman_to_int('III')==3\n    assert roman_to_int('IV')==4\n    assert roman_to_int('IX')==9\n"),
    ("int_to_roman", "def int_to_roman(n:int)->str:\n    \"\"\"Convert 1..3999 to Roman numeral\"\"\"\n", "import pytest\n\ndef test_ir():\n    assert int_to_roman(3)=='III'\n    assert int_to_roman(4)=='IV'\n    assert int_to_roman(9)=='IX'\n"),
    ("top_k", "def top_k(lst:list, k:int)->list:\n    \"\"\"Return k largest elements sorted desc\"\"\"\n", "import pytest\n\ndef test_topk():\n    assert top_k([1,5,2,4,3],2)==[5,4]\n"),
    ("unique_chars", "def unique_chars(s:str)->bool:\n    \"\"\"True if all chars are unique\"\"\"\n", "import pytest\n\ndef test_uc():\n    assert unique_chars('abc')\n    assert not unique_chars('aba')\n"),
    ("median", "def median(nums:list)->float:\n    \"\"\"Return median of a non-empty list\"\"\"\n", "import pytest\n\ndef test_med():\n    assert median([1,3,2])==2\n    assert median([1,2,3,4])==2.5\n"),
    ("mode", "def mode(nums:list):\n    \"\"\"Return the most frequent element; tie-break by smallest value\"\"\"\n", "import pytest\n\ndef test_mode():\n    assert mode([1,1,2,2,2,3])==2\n    assert mode([3,3,1,1])==1\n"),
    ("to_snake", "def to_snake(s:str)->str:\n    \"\"\"Convert CamelCase to snake_case\"\"\"\n", "import pytest\n\ndef test_snake():\n    assert to_snake('CamelCase')=='camel_case'\n    assert to_snake('HTTPServer')=='http_server'\n"),
    ("dedup_preserve", "def dedup_preserve(lst:list)->list:\n    \"\"\"Remove duplicates preserving first appearance\"\"\"\n", "import pytest\n\ndef test_dedup():\n    assert dedup_preserve([1,2,1,3,2])==[1,2,3]\n"),
    ("chunk", "def chunk(lst:list, size:int)->list:\n    \"\"\"Split list into chunks of given size (>0)\"\"\"\n", "import pytest\n\ndef test_chunk():\n    assert chunk([1,2,3,4,5],2)==[[1,2],[3,4],[5]]\n"),
]

# --- Si no existe el JSONL, lo creamos ---
if not Path(JSONL_PATH).exists():
    with open(JSONL_PATH,"w",encoding="utf-8") as f:
        for i,(entry,prompt,test) in enumerate(TASKS):
            rec = {"task_id": f"HE_LOCAL/{i}", "prompt": prompt, "entry_point": entry, "test": test}
            f.write(json.dumps(rec) + "\n")
    print("Creado:", JSONL_PATH)
else:
    print("Usando existente:", JSONL_PATH)

# --- Función para ejecutar Pytest (la tuya, es perfecta) ---
def run_pytest(code:str, entry:str, tests:str, timeout=10)->bool:
    with tempfile.TemporaryDirectory() as td:
        code_path = os.path.join(td, "user_code.py")
        test_path = os.path.join(td, "test_code.py")
        
        with open(code_path,"w",encoding="utf-8") as f: f.write(code)
        
        with open(test_path,"w",encoding="utf-8") as f:
            f.write(f"from user_code import {entry}\n")
            f.write(tests + "\n")
            # f.write("if __name__ == '__main__':\n    import pytest; pytest.main([__file__,'-q'])\n") # No es necesario
        
        try:
            # Usar pytest directamente es más robusto
            p = subprocess.run(["pytest", test_path], capture_output=True, text=True, timeout=timeout)
            return p.returncode == 0
        except Exception:
            return False

# --- ¡NUEVA! Función para extraer el código ---
def extract_python_code(text, prompt):
    # 1. Buscar bloque ```python ... ```
    pattern = r"```python\n(.*?)\n```"
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()
    
    # 2. Si no, buscar bloque ``` ... ```
    pattern = r"```\n(.*?)\n```"
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()

    # 3. Si no, tomar todo después de [/INST]
    if "[/INST]" in text:
        text_after_inst = text.split("[/INST]", 1)[1].strip()
        # Si el modelo es bueno, puede que solo genere el código.
        # Si repite el prompt, lo quitamos.
        if text_after_inst.startswith(prompt.strip()):
            text_after_inst = text_after_inst[len(prompt.strip()):]
        # Si empieza con la función (completándola), está bien
        if text_after_inst.strip().startswith("def"):
             return text_after_inst.strip()
        # Si el modelo solo genera la *continuación* del prompt
        if not prompt.strip().endswith("\n"):
             prompt += "\n"
        return prompt + text_after_inst # Devolver prompt + continuación
    
    # 4. Fallback: asumir que la respuesta es solo código
    return text.strip()

# --- ¡MODIFICADA! Función de Evaluación ---
def eval_local(model, tokenizer, jsonl_path, results_csv_path):
    ok, total = 0, 0
    rows = []
    
    with open(jsonl_path,encoding="utf-8") as f:
        for line in tqdm(f, desc="Evaluando tareas"):
            t = json.loads(line)
            prompt_file, entry, tests, tid = t["prompt"], t["entry_point"], t["test"], t["task_id"]
            
            # 1. Formatear el prompt para nuestro Chat Model
            prompt_chat = (
                f"[INST] Escribe una función de Python para el siguiente problema. "
                f"Responde SÓLO con el bloque de código Python, sin explicaciones.\n\n"
                f"```python\n{prompt_file}\n```\n[/INST]"
            )
            
            inputs = tokenizer(prompt_chat, return_tensors="pt").to(model.device)
            
            # 2. Generar
            with torch.no_grad():
                out = model.generate(**inputs, **GEN_KW)
            
            gen_text = tokenizer.decode(out[0], skip_special_tokens=True)
            
            # 3. Extraer el código
            code_extracted = extract_python_code(gen_text, prompt_file)
            
            # 4. Probar
            passed = run_pytest(code_extracted, entry, tests)
            
            ok += int(passed); total += 1
            rows.append({"task_id": tid, "passed": int(passed)})
            # print(f"{tid}: {'OK' if passed else 'FAIL'}") # Descomentar para debug
    
    print(f"\nResultados para {model.config._name_or_path}:")
    print(f"pass@1: {ok}/{total} = {ok/total:.2%}")
    
    # 5. Guardar CSV
    with open(results_csv_path,"w",newline="",encoding="utf-8") as cf:
        w = csv.DictWriter(cf, fieldnames=["task_id","passed"]); w.writeheader(); w.writerows(rows)
    print("CSV guardado en:", results_csv_path)
    return ok/total

print("Funciones de HumanEval-Local definidas.")

Usando existente: ./humaneval_local_25.jsonl
Funciones de HumanEval-Local definidas.


In [8]:
import torch
import gc
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel

# --- 0. Limpiar Memoria ---
gc.collect()
torch.cuda.empty_cache()

# --- 1. Definir Constantes de Carga ---
MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.3"
ADAPTER_PATH = "./Mistral-7B-Python-Expert-v1-final"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
tokenizer.pad_token = tokenizer.eos_token

# =========================================================
# 
#              PRUEBA 1: MODELO BASE
# 
# =========================================================
print("\n--- Evaluando Modelo BASE ---")
base_model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto"
)
base_model.eval()

# Ejecutar la evaluación
eval_local(base_model, tokenizer, JSONL_PATH, RESULTS_CSV_BASE)

# Limpiar
del base_model
gc.collect()
torch.cuda.empty_cache()
print("--- Modelo Base limpiado ---")


# =========================================================
# 
#             PRUEBA 2: MODELO EXPERTO (AFINADO)
# 
# =========================================================
print("\n--- Evaluando Modelo EXPERTO ---")
# Volver a cargar el modelo base (necesario para aplicar LoRA)
base_model_for_lora = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto"
)
# ¡Cargar los adaptadores LoRA encima!
model_experto = PeftModel.from_pretrained(base_model_for_lora, ADAPTER_PATH)
model_experto.eval() # Poner en modo de evaluación

# Ejecutar la evaluación
eval_local(model_experto, tokenizer, JSONL_PATH, RESULTS_CSV_EXPERTO)

# Limpiar
del model_experto
del base_model_for_lora
gc.collect()
torch.cuda.empty_cache()
print("--- Modelo Experto limpiado ---")

print("\n--- ¡Benchmark completado! ---")


--- Evaluando Modelo BASE ---


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Evaluando tareas: 25it [06:34, 15.77s/it]



Resultados para mistralai/Mistral-7B-Instruct-v0.3:
pass@1: 0/25 = 0.00%
CSV guardado en: ./humaneval_local_results_BASE.csv
--- Modelo Base limpiado ---

--- Evaluando Modelo EXPERTO ---


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Evaluando tareas: 25it [08:08, 19.55s/it]


Resultados para mistralai/Mistral-7B-Instruct-v0.3:
pass@1: 0/25 = 0.00%
CSV guardado en: ./humaneval_local_results_EXPERTO.csv
--- Modelo Experto limpiado ---

--- ¡Benchmark completado! ---





## Merge

In [6]:
# === MERGE LoRA → MODELO COMPLETO (Notebook-ready) ===
import os, gc, torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel

# Limpieza básica de memoria
gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

# Rutas
BASE_MODEL_ID    = "mistralai/Mistral-7B-Instruct-v0.3"
ADAPTER_PATH     = "./Mistral-7B-Python-Expert-v1-final"     # carpeta de trainer.save_model(...)
MERGED_MODEL_DIR = "./Code-Specialist-7b"    # salida del modelo fusionado

print("Base:", BASE_MODEL_ID)
print("Adapter:", ADAPTER_PATH)
print("Salida:", MERGED_MODEL_DIR)

Base: mistralai/Mistral-7B-Instruct-v0.3
Adapter: ./Mistral-7B-Python-Expert-v1-final
Salida: ./Code-Specialist-7b


In [7]:
# Cargar base en CPU (sin offload raro)
DTYPE_CPU = torch.bfloat16   
print("Cargando base en CPU...")
base_model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL_ID,
    torch_dtype=DTYPE_CPU,
    device_map=None,              # CPU
    trust_remote_code=True
)

print("Montando adaptadores LoRA...")
peft_model = PeftModel.from_pretrained(
    base_model,
    ADAPTER_PATH,
    is_trainable=False
)

print("Fusionando (merge_and_unload)...")
merged_model = peft_model.merge_and_unload()
print("Merge completo.")

`torch_dtype` is deprecated! Use `dtype` instead!


Cargando base en CPU...


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Montando adaptadores LoRA...
Fusionando (merge_and_unload)...
Merge completo.


In [8]:
# Asegurar carpeta de salida
os.makedirs(MERGED_MODEL_DIR, exist_ok=True)

# Guardado en safetensors (recomendado)
print("Guardando pesos fusionados...")
merged_model.save_pretrained(MERGED_MODEL_DIR, safe_serialization=True)

# Guardar tokenizer con pad_token configurado
print("Guardando tokenizer...")
tok = AutoTokenizer.from_pretrained(BASE_MODEL_ID, use_fast=True, trust_remote_code=True)
if tok.pad_token is None:
    tok.pad_token = tok.eos_token
tok.save_pretrained(MERGED_MODEL_DIR)

# Persistir dtype preferido en config
try:
    merged_model.config.torch_dtype = str(merged_model.dtype).replace("torch.", "")
    merged_model.config.save_pretrained(MERGED_MODEL_DIR)
except Exception as e:
    print("Aviso: no se pudo persistir torch_dtype en config:", e)

print("Guardado listo en:", MERGED_MODEL_DIR)

Guardando pesos fusionados...
Guardando tokenizer...
Guardado listo en: ./Code-Specialist-7b


#### Sanity check

In [4]:
from transformers import AutoModelForCausalLM, AutoTokenizer

print("Sanity check: cargando modelo merged para inferencia...")
tok = AutoTokenizer.from_pretrained(MERGED_MODEL_DIR, use_fast=True)
if tok.pad_token is None:
    tok.pad_token = tok.eos_token

dtype_infer = torch.bfloat16 if (torch.cuda.is_available() and torch.cuda.is_bf16_supported()) else torch.float16

merged_for_infer = AutoModelForCausalLM.from_pretrained(
    MERGED_MODEL_DIR,
    torch_dtype=dtype_infer,
    device_map="auto",
    trust_remote_code=True
).eval()

prompt = "[INST] Escribe una función Python que sume dos enteros y devuelve solo la función. [/INST]"
batch = tok(prompt, return_tensors="pt").to(merged_for_infer.device)

with torch.no_grad():
    out = merged_for_infer.generate(
        **batch, max_new_tokens=128,
        eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id
    )

print(tok.decode(out[0], skip_special_tokens=True))


Sanity check: cargando modelo merged para inferencia...


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Some parameters are on the meta device because they were offloaded to the cpu.


Escribe una función Python que sume dos enteros y devuelve solo la función.  Aquí tienes una función simple en Python que sume dos enteros:

```python
def sumar(num1, num2):
    return num1 + num2
```

Puedes usar esta función de la siguiente manera:

```python
resultado = sumar(5, 3)
print(resultado)  # Imprime 8
```

Esta función toma dos números enteros como parámetros y devuelve la suma de ambos.


In [7]:
# === Cargar MERGED en 4-bit con fallback robusto ===
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig

MERGED_MODEL_DIR = "./Mistral-7B-Python-Expert-v1-MERGED"

# Tokenizer
tok4 = AutoTokenizer.from_pretrained(MERGED_MODEL_DIR, use_fast=True)
if tok4.pad_token is None:
    tok4.pad_token = tok4.eos_token

def load_4bit_force_gpu():
    """Carga 4-bit forzando TODO a la GPU (sin offload a CPU/disk)."""
    assert torch.cuda.is_available(), "Se requiere GPU para 4-bit."
    gpu_id = torch.cuda.current_device()
    bnb4 = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16,
        bnb_4bit_use_double_quant=True,
    )
    # Fuerza todo en la GPU actual:
    model = AutoModelForCausalLM.from_pretrained(
        MERGED_MODEL_DIR,
        quantization_config=bnb4,
        device_map={"": gpu_id},      # <- evita que "auto" mande algo a CPU/disk
        trust_remote_code=True,
    )
    return model.eval()

def load_8bit_with_cpu_offload():
    """Fallback: 8-bit permite offload FP32 a CPU."""
    from transformers import BitsAndBytesConfig
    bnb8 = BitsAndBytesConfig(
        load_in_8bit=True,
        llm_int8_enable_fp32_cpu_offload=True,  # <- int8 sí permite offload en CPU
    )
    model = AutoModelForCausalLM.from_pretrained(
        MERGED_MODEL_DIR,
        quantization_config=bnb8,
        device_map="auto",             # puede repartir entre GPU/CPU
        trust_remote_code=True,
    )
    return model.eval()

def load_16bit_if_possible():
    """Último recurso: sin cuantización (bf16/fp16), depende de tu VRAM."""
    dtype = torch.bfloat16 if (torch.cuda.is_available() and torch.cuda.is_bf16_supported()) else torch.float16
    model = AutoModelForCausalLM.from_pretrained(
        MERGED_MODEL_DIR,
        torch_dtype=dtype,
        device_map="auto" if torch.cuda.is_available() else None,
        trust_remote_code=True,
    )
    return model.eval()

try:
    print("Intentando 4-bit forzado en GPU...")
    merged_model = load_4bit_force_gpu()
    print("✅ 4-bit en GPU cargado.")
except Exception as e4:
    print("[Aviso] 4-bit en GPU no se pudo cargar:", type(e4).__name__, "-", e4)
    try:
        print("Intentando 8-bit con offload en CPU...")
        merged_model = load_8bit_with_cpu_offload()
        print("✅ 8-bit con offload cargado.")
    except Exception as e8:
        print("[Aviso] 8-bit con offload tampoco se pudo cargar:", type(e8).__name__, "-", e8)
        print("Intentando 16-bit (sin cuantizar)...")
        merged_model = load_16bit_if_possible()
        print("✅ 16-bit cargado (verifica VRAM).")

print("Modelo listo.")


Intentando 4-bit forzado en GPU...


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

✅ 4-bit en GPU cargado.
Modelo listo.
