In [1]:
from collections import defaultdict
import json

# Lista dos arquivos .jsonl (com barra invertida corrigida para barra normal se estiver em ambiente Unix/Linux)
jsonl_files = [
    "results\samples_gsm8k_cot_62777.jsonl",
    "results\samples_gsm8k_cot_17456.jsonl",
    "results\samples_gsm8k_cot_46379.jsonl",
    "results\samples_gsm8k_cot_15136.jsonl",
]

# Agrupar tentativas por doc_id
samples_by_doc = defaultdict(list)

  "results\samples_gsm8k_cot_62777.jsonl",
  "results\samples_gsm8k_cot_17456.jsonl",
  "results\samples_gsm8k_cot_46379.jsonl",
  "results\samples_gsm8k_cot_15136.jsonl",


In [2]:
for path in jsonl_files:
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            sample = json.loads(line)
            doc_id = sample["doc_id"]
            filter_type = sample.get("filter", "unknown")  # pegar o filtro
            is_correct = sample.get("exact_match", 0) == 1.0
            samples_by_doc[(doc_id, filter_type)].append(is_correct)

In [4]:
# Função para calcular pass@N
def pass_at_n(samples_by_doc, N):
    total = len(samples_by_doc)
    passed = sum(any(resps[:N]) for resps in samples_by_doc.values())
    return passed / total if total > 0 else 0.0

In [5]:
# Agora calculamos pass@N para cada filtro separadamente:
results_by_filter = {}

# Primeiro identificar filtros únicos
filtros = set(f for _, f in samples_by_doc.keys())

for f in filtros:
    filtered_samples = {k: v for k, v in samples_by_doc.items() if k[1] == f}
    max_n = max(len(v) for v in filtered_samples.values())
    results = {f"pass@{n}": pass_at_n(filtered_samples, n) for n in [1, 2, 3, 4] if n <= max_n}
    results["total_problems"] = len(filtered_samples)
    results["attempts_per_problem"] = max_n
    results_by_filter[f] = results

In [6]:
results_by_filter

{'flexible-extract': {'pass@1': 0.7266666666666667,
  'pass@2': 0.7733333333333333,
  'pass@3': 0.7933333333333333,
  'pass@4': 0.8066666666666666,
  'total_problems': 300,
  'attempts_per_problem': 4},
 'strict-match': {'pass@1': 0.77,
  'pass@2': 0.81,
  'pass@3': 0.83,
  'pass@4': 0.84,
  'total_problems': 300,
  'attempts_per_problem': 4}}

In [1]:
import os
import json
import re
from collections import defaultdict

# Seus arquivos
jsonl_files = [
    "results/samples_gsm8k_cot_62777.jsonl",
    "results/samples_gsm8k_cot_17456.jsonl",
    "results/samples_gsm8k_cot_46379.jsonl",
    "results/samples_gsm8k_cot_15136.jsonl",
]

# Regex para extrair o index
def extract_index(path):
    match = re.search(r"cot_(\d+)", path)
    return match.group(1) if match else "unknown"

# Coletar samples agrupando por (index, filter, doc_id)
samples = defaultdict(lambda: defaultdict(list))  # samples[index][filter] = list of listas

for path in jsonl_files:
    index = extract_index(path)
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            sample = json.loads(line)
            doc_id = sample["doc_id"]
            filter_type = sample.get("filter", "unknown")
            is_correct = sample.get("exact_match", 0) == 1.0
            key = (index, filter_type, doc_id)
            samples[(index, filter_type)][doc_id].append(is_correct)

# Função pass@N
def pass_at_n(samples_by_doc, N):
    total = len(samples_by_doc)
    passed = sum(any(resps[:N]) for resps in samples_by_doc.values())
    return passed / total if total > 0 else 0.0

# Resultados por index e filtro
results_all = {}

for (index, filter_type), doc_dict in samples.items():
    max_n = max(len(v) for v in doc_dict.values())
    result = {
        f"pass@{n}": pass_at_n(doc_dict, n)
        for n in [1, 2, 3, 5] if n <= max_n
    }
    result["total_problems"] = len(doc_dict)
    result["attempts_per_problem"] = max_n
    results_all[(index, filter_type)] = result

# Mostrar resultados organizados
from pprint import pprint
pprint(results_all)

# Agora identificar o melhor index por filtro com base em pass@1
best_by_filter = {}

for filter_type in set(f for _, f in results_all):
    filtered = {
        index: metrics["pass@1"]
        for (index, f), metrics in results_all.items()
        if f == filter_type and "pass@1" in metrics
    }
    best = max(filtered.items(), key=lambda x: x[1], default=("none", 0.0))
    best_by_filter[filter_type] = {"best_index": best[0], "pass@1": best[1]}

print("\n🏆 Melhores índices por filtro (baseado em pass@1):")
pprint(best_by_filter)


{('15136', 'flexible-extract'): {'attempts_per_problem': 1,
                                 'pass@1': 0.7133333333333334,
                                 'total_problems': 300},
 ('15136', 'strict-match'): {'attempts_per_problem': 1,
                             'pass@1': 0.7733333333333333,
                             'total_problems': 300},
 ('17456', 'flexible-extract'): {'attempts_per_problem': 1,
                                 'pass@1': 0.6966666666666667,
                                 'total_problems': 300},
 ('17456', 'strict-match'): {'attempts_per_problem': 1,
                             'pass@1': 0.7766666666666666,
                             'total_problems': 300},
 ('46379', 'flexible-extract'): {'attempts_per_problem': 1,
                                 'pass@1': 0.7,
                                 'total_problems': 300},
 ('46379', 'strict-match'): {'attempts_per_problem': 1,
                             'pass@1': 0.7866666666666666,
                        

In [7]:
from collections import defaultdict
import json

# Lista dos arquivos .jsonl (com barra invertida corrigida para barra normal se estiver em ambiente Unix/Linux)
jsonl_files = [
    "results\samples_gsm8k_cot_normal1.jsonl",
    "results\samples_gsm8k_cot_normal2.jsonl",
    "results\samples_gsm8k_cot_normal3.jsonl",
    "results\samples_gsm8k_cot_normal4.jsonl",
]

# Agrupar tentativas por doc_id
samples_by_doc = defaultdict(list)

for path in jsonl_files:
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            sample = json.loads(line)
            doc_id = sample["doc_id"]
            filter_type = sample.get("filter", "unknown")  # pegar o filtro
            is_correct = sample.get("exact_match", 0) == 1.0
            samples_by_doc[(doc_id, filter_type)].append(is_correct)

# Função para calcular pass@N
def pass_at_n(samples_by_doc, N):
    total = len(samples_by_doc)
    passed = sum(any(resps[:N]) for resps in samples_by_doc.values())
    return passed / total if total > 0 else 0.0

# Agora calculamos pass@N para cada filtro separadamente:
results_by_filter = {}

# Primeiro identificar filtros únicos
filtros = set(f for _, f in samples_by_doc.keys())

for f in filtros:
    filtered_samples = {k: v for k, v in samples_by_doc.items() if k[1] == f}
    max_n = max(len(v) for v in filtered_samples.values())
    results = {f"pass@{n}": pass_at_n(filtered_samples, n) for n in [1, 2, 3, 4] if n <= max_n}
    results["total_problems"] = len(filtered_samples)
    results["attempts_per_problem"] = max_n
    results_by_filter[f] = results

  "results\samples_gsm8k_cot_normal1.jsonl",
  "results\samples_gsm8k_cot_normal2.jsonl",
  "results\samples_gsm8k_cot_normal3.jsonl",
  "results\samples_gsm8k_cot_normal4.jsonl",


In [8]:
results_by_filter

{'flexible-extract': {'pass@1': 0.7266666666666667,
  'pass@2': 0.7266666666666667,
  'pass@3': 0.7266666666666667,
  'pass@4': 0.7266666666666667,
  'total_problems': 300,
  'attempts_per_problem': 4},
 'strict-match': {'pass@1': 0.7666666666666667,
  'pass@2': 0.7666666666666667,
  'pass@3': 0.7666666666666667,
  'pass@4': 0.7666666666666667,
  'total_problems': 300,
  'attempts_per_problem': 4}}

In [5]:
!pip install --upgrade pip

Defaulting to user installation because normal site-packages is not writeable


In [9]:
!pip install --upgrade transformers

Defaulting to user installation because normal site-packages is not writeable


In [1]:
import torch
from torch import nn, Tensor
from transformers import AutoModelForCausalLM, AutoTokenizer
from contextlib import contextmanager
from sae_lens import SAE  # pip install sae-lens
from typing import Callable, Generator

# ====== CONFIGURAÇÕES DEFINIDAS POR VOCÊ ======
model_name = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"  # ou outro modelo
hook_name = "model.layers.19"                            # ex: "model.layers.20"
feature_index = 62777
steering_coefficient = 4.0
sae_id = "blocks.19.hook_resid_post"
sae_repo = "andreuka18/deepseek-r1-distill-llama-8b-lmsys-openthoughts"  # ex
device = "cuda"

# ====== LOAD MODELO E TOKENIZER ======
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# ====== LOAD SAE E VETOR DE STEERING ======
sae = SAE.from_pretrained(sae_repo, sae_id)[0]
sae.to(device)

steering_vector = sae.W_dec[feature_index].to(device)
bias = sae.b_dec.to(device)
hook_dtype = model.dtype

# Função de steering: soma do vetor ao residual
def steer_fn(acts: Tensor) -> Tensor:
    return acts + steering_coefficient * steering_vector.to(hook_dtype)

# ====== CONTEXT MANAGER PARA FEATURE STEERING ======
@contextmanager
def steer(model: nn.Module, hook_name: str, hook_fn: Callable[[Tensor], Tensor]) -> Generator[None, None, None]:
    handle = None

    # Hook nos módulos nomeados
    for name, module in model.named_modules():
        if name == hook_name:
            handle = module.register_forward_hook(lambda m, inp, out: (hook_fn(out[0]), *out[1:]) if isinstance(out, tuple) else hook_fn(out))
            break

    if handle is None:
        raise ValueError(f"Hookpoint '{hook_name}' não encontrado no modelo.")

    try:
        yield
    finally:
        handle.remove()

def generate(prompt: str, do_steer: bool = False, temperature: float = 1.0, top_p: float = 0.9):
    # Prepara os inputs
    input_ids = tokenizer(prompt, return_tensors="pt").to(device)

    gen_kwargs = dict(
        input_ids=input_ids.input_ids,
        attention_mask=input_ids.attention_mask,
        max_new_tokens=512,
        temperature=0.1,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id
    )

    # Geração com ou sem steering
    if do_steer:
        with steer(model, hook_name, steer_fn):
            output = model.generate(**gen_kwargs)
    else:
        output = model.generate(**gen_kwargs)

    print(tokenizer.decode(output[0], skip_special_tokens=True))

ModuleNotFoundError: Could not import module 'BertForPreTraining'. Are this object's requirements defined correctly?