# Login Llama

In [None]:
!huggingface-cli login

# GSM8K

In [None]:
!pip install datasets

In [None]:
from datasets import load_dataset

dataset = load_dataset("openai/gsm8k", "main")

# Experiment

In [None]:
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
import time

def load_llama7b_model(model_path, device='cuda'):
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForCausalLM.from_pretrained(
        model_path,
        torch_dtype=torch.float16,
        device_map="auto"
    )
    model.eval()
    return tokenizer, model

def check_sentence_end(decoded_text_so_far: str):
    """
    문장 종료 판별:
      - 마침표('.', '?', '!') 여부로 판별하되, 소수점(2.5, 3.21 등)은 문장 끝으로 취급하지 않도록 처리
      - '...', '?!', 한글 마침표 등 더 복잡한 처리가 필요할 수 있음
    """
    decoded_text_so_far = decoded_text_so_far.strip()
    if len(decoded_text_so_far) == 0:
        return False

    last_char = decoded_text_so_far[-1]

    # 1) 소수점 예외 처리: 마지막이 '.'이면서 바로 앞이 숫자라면 소수점으로 취급 -> 문장 끝 아님
    if last_char == '.':
        if len(decoded_text_so_far) >= 2 and decoded_text_so_far[-2].isdigit():
            return False
        else:
            return True
    elif last_char in ['?', '!']:
        return True
    return False

def sample_single_sentence(
    model,
    tokenizer,
    context_ids,
    top_p=0.9,
    temperature=1.0,
    max_tokens_per_sentence=50,
    device='cuda'
):
    """
    (문장 단위) 한 번의 샘플링 (Top-p + Temperature):
      1) 문장이 끝날 때까지 토큰을 생성 ('.','?','!' 또는 eos_token 등장 시)
      2) 해당 문장의 전체 G-NLL(= 토큰별 -log p의 합) 계산
      3) 문장에 사용된 토큰 수 반환
      4) 업데이트된 new_ids(= context_ids + 새 토큰들)
    """
    new_ids = context_ids.clone()
    total_gnll = 0.0
    generated_token_count = 0

    for step in range(max_tokens_per_sentence):
        outputs = model(input_ids=new_ids)
        logits = outputs.logits
        last_token_logits = logits[:, -1, :]

        # temperature 적용
        scaled_logits = last_token_logits / temperature

        # top-p filtering
        sorted_logits, sorted_indices = torch.sort(scaled_logits, descending=True)
        cprobs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)
        idx_remove = (cprobs > top_p)
        if idx_remove.any():
            first_true_idx = torch.nonzero(idx_remove, as_tuple=True)[1][0].item()
            sorted_logits[0, first_true_idx+1:] = float('-inf')

        # 원래 순서대로 재배치
        re_sorted_logits = torch.full_like(scaled_logits, float('-inf'))
        re_sorted_logits[0, sorted_indices] = sorted_logits[0]

        # 샘플링
        probs = torch.softmax(re_sorted_logits, dim=-1)
        next_token_id = torch.multinomial(probs, num_samples=1)

        # G-NLL 누적 (마지막 뽑힌 토큰의 -log prob)
        next_token_prob = probs[0, next_token_id]
        total_gnll += -torch.log(next_token_prob).item()
        generated_token_count += 1

        # 시퀀스에 추가
        new_ids = torch.cat([new_ids, next_token_id], dim=1)

        # eos 검사
        if tokenizer.eos_token_id is not None:
            if next_token_id.item() == tokenizer.eos_token_id:
                break

        # 문장 종료 검사
        decoded_so_far = tokenizer.decode(new_ids[0], skip_special_tokens=True)
        if check_sentence_end(decoded_so_far):
            break

    # 새로 생성된 부분 디코딩
    added_tokens = new_ids[0, context_ids.shape[1]:]
    generated_sentence = tokenizer.decode(added_tokens, skip_special_tokens=True)

    return generated_sentence, total_gnll, generated_token_count, new_ids

def sample_single_sentence_greedy(
    model,
    tokenizer,
    context_ids,
    max_tokens_per_sentence=50,
    device='cuda'
):
    """
    (문장 단위) 한 번의 그리디 디코딩:
      - 각 단계마다 확률이 가장 높은 토큰 하나를 선택 (argmax)
      - 문장 끝(?, !, .) 또는 eos_token이면 중단
    """
    new_ids = context_ids.clone()
    total_gnll = 0.0
    generated_token_count = 0

    for step in range(max_tokens_per_sentence):
        outputs = model(input_ids=new_ids)
        logits = outputs.logits
        last_token_logits = logits[:, -1, :]

        # argmax 선택 (greedy)
        next_token_id = torch.argmax(last_token_logits, dim=-1, keepdim=True)

        # G-NLL 계산
        probs = torch.softmax(last_token_logits, dim=-1)
        next_token_prob = probs[0, next_token_id]
        total_gnll += -torch.log(next_token_prob).item()
        generated_token_count += 1

        # 시퀀스에 추가
        new_ids = torch.cat([new_ids, next_token_id], dim=1)

        # eos 검사
        if tokenizer.eos_token_id is not None:
            if next_token_id.item() == tokenizer.eos_token_id:
                break

        # 문장 종료 검사
        decoded_so_far = tokenizer.decode(new_ids[0], skip_special_tokens=True)
        if check_sentence_end(decoded_so_far):
            break

    added_tokens = new_ids[0, context_ids.shape[1]:]
    generated_sentence = tokenizer.decode(added_tokens, skip_special_tokens=True)
    return generated_sentence, total_gnll, generated_token_count, new_ids

def sample_sentence_candidates_and_pick_best(
    model,
    tokenizer,
    context_ids,
    n_candidates=3,
    uncertainty_method="total",
    top_p=0.9,
    temperature=1.0,
    max_tokens_per_sentence=50,
    device='cuda',
    min_or_max="max"
):
    """
    문장 하나를 만들기 위해 N번 샘플링 후,
    (1) total G-NLL (비정규화)
    (2) normalized G-NLL (토큰 수로 나눈 값)
    중 하나로 후보를 비교 -> min_or_max에 따라 결정
    """
    candidates = []

    for i in range(n_candidates):
        gen_sentence, gnll, token_count, new_ids = sample_single_sentence(
            model=model,
            tokenizer=tokenizer,
            context_ids=context_ids,
            top_p=top_p,
            temperature=temperature,
            max_tokens_per_sentence=max_tokens_per_sentence,
            device=device
        )
        if uncertainty_method == "total":
            measure = gnll
        else:  # "normalized"
            measure = gnll / (token_count if token_count > 0 else 1)

        candidates.append((gen_sentence, gnll, token_count, measure, new_ids))

    if min_or_max == "max":
        best_idx = max(range(len(candidates)), key=lambda i: candidates[i][3])
    else:  # "min"
        best_idx = min(range(len(candidates)), key=lambda i: candidates[i][3])

    best_sentence, best_gnll, best_count, best_measure, best_ids = candidates[best_idx]
    return best_sentence, best_gnll, best_ids

def generate_text_sentence_by_sentence(
    model,
    tokenizer,
    prompt,
    max_sentences=5,
    n_candidates=3,
    uncertainty_method="total",
    top_p=0.9,
    temperature=1.0,
    max_tokens_per_sentence=50,
    device='cuda',
    min_or_max="max"
):
    """
    문장 단위 샘플링 기반 생성 (Top-p, Temperature 기반):
      - 각 문장 생성 시, N개의 후보 중 best pick
      - best pick 선정 시 'total' or 'normalized' + min_or_max 로직
      - 패턴("the answer is:", "The answer is:", "The final answer is:") 등장 시 중단
    """
    context_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
    final_text = tokenizer.decode(context_ids[0], skip_special_tokens=True)

    sentence_count = 0

    while sentence_count < max_sentences:
        sentence_count += 1

        best_sentence, best_gnll, best_ids = sample_sentence_candidates_and_pick_best(
            model=model,
            tokenizer=tokenizer,
            context_ids=context_ids,
            n_candidates=n_candidates,
            uncertainty_method=uncertainty_method,
            top_p=top_p,
            temperature=temperature,
            max_tokens_per_sentence=max_tokens_per_sentence,
            device=device,
            min_or_max=min_or_max
        )
        context_ids = best_ids
        final_text += best_sentence

        # ------------------------------
        # 패턴 조건: 중단 로직
        # ------------------------------
        if final_text.count("the answer is:") == 1:
            break
        if final_text.count("The answer is:") == 1:
            break
        if final_text.count("The final answer is:") == 2:
            break

        # eos 검사
        if tokenizer.eos_token_id is not None:
            if context_ids[0, -1].item() == tokenizer.eos_token_id:
                break

    return final_text

def generate_text_sentence_by_sentence_greedy(
    model,
    tokenizer,
    prompt,
    max_sentences=5,
    max_tokens_per_sentence=50,
    device='cuda'
):
    """
    문장 단위 '그리디' 기반 생성:
      - 각 문장마다 greedy decoding으로 진행
      - 패턴("the answer is:", "The answer is:", "The final answer is:") 등장 시 중단
    """
    context_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
    final_text = tokenizer.decode(context_ids[0], skip_special_tokens=True)

    sentence_count = 0

    while sentence_count < max_sentences:
        sentence_count += 1

        gen_sentence, gnll, token_count, new_ids = sample_single_sentence_greedy(
            model=model,
            tokenizer=tokenizer,
            context_ids=context_ids,
            max_tokens_per_sentence=max_tokens_per_sentence,
            device=device
        )
        context_ids = new_ids
        final_text += gen_sentence

        # ------------------------------
        # 패턴 조건: 중단 로직
        # ------------------------------
        if final_text.count("the answer is:") == 1:
            break
        if final_text.count("The answer is:") == 1:
            break
        if final_text.count("The final answer is:") == 2:
            break

        # eos 검사
        if tokenizer.eos_token_id is not None:
            if context_ids[0, -1].item() == tokenizer.eos_token_id:
                break

    return final_text

In [None]:
import os
import time

if __name__ == "__main__":
    # -------------------------------------------------
    # 1) 모델 로딩
    # -------------------------------------------------
    model_path = "meta-llama/Llama-3.1-8B-Instruct"
    device = "cuda" if torch.cuda.is_available() else "cpu"
    tokenizer, model = load_llama7b_model(model_path, device=device)

    # 결과 저장 경로
    output_dir = "Your Path"
    os.makedirs(output_dir, exist_ok=True)
    output_file = os.path.join(output_dir, "inference_results.txt")

    # 시스템 지시
    system_instruction = (
        "You are a helpful assistant. When you generate an answer, "
        "please start it with the following phrase: 'The final answer is: ' "
        "and then provide your solution.\n"
        "If you have finished your reasoning, do not continue beyond that phrase."
        "Let's think step by step. Reason carefully then answer."
    )

    # -------------------------------------------------
    # 3) 테스트 데이터셋에 대해 순회하며 인퍼런스
    # -------------------------------------------------
    for i, question_text in enumerate(dataset['test']['question']):
        print(f"\n============================")
        print(f"Test sample #{i+1}")
        print(f"Question: {question_text}")

        log_str_list = []
        log_str_list.append("\n============================")
        log_str_list.append(f"Test sample #{i+1}")
        log_str_list.append(f"Question: {question_text}")

        # 프롬프트 준비
        prompt = system_instruction + "\n\n" + question_text

        # ---- (1) total G-NLL + max ----
        start_t = time.time()
        ans_total_max = generate_text_sentence_by_sentence(
            model=model,
            tokenizer=tokenizer,
            prompt=prompt,
            max_sentences=10,
            n_candidates=10,
            uncertainty_method="total",  # total G-NLL
            top_p=0.9,
            temperature=1.0,
            max_tokens_per_sentence=100,
            device=device,
            min_or_max="max"             # max
        )
        end_t = time.time()
        elapsed_total_max = end_t - start_t

        print("\n[1] total G-NLL + max:")
        print(ans_total_max)
        print(f"Elapsed time: {elapsed_total_max:.2f} seconds")

        log_str_list.append("\n[1] total G-NLL + max:")
        log_str_list.append(ans_total_max)
        log_str_list.append(f"Elapsed time: {elapsed_total_max:.2f} seconds")

        # ---- (2) normalized G-NLL + max ----
        start_t = time.time()
        ans_norm_max = generate_text_sentence_by_sentence(
            model=model,
            tokenizer=tokenizer,
            prompt=prompt,
            max_sentences=10,
            n_candidates=10,
            uncertainty_method="normalized",
            top_p=0.9,
            temperature=1.0,
            max_tokens_per_sentence=100,
            device=device,
            min_or_max="max"
        )
        end_t = time.time()
        elapsed_norm_max = end_t - start_t

        print("\n[2] normalized G-NLL + max:")
        print(ans_norm_max)
        print(f"Elapsed time: {elapsed_norm_max:.2f} seconds")

        log_str_list.append("\n[2] normalized G-NLL + max:")
        log_str_list.append(ans_norm_max)
        log_str_list.append(f"Elapsed time: {elapsed_norm_max:.2f} seconds")

        # ---- (3) total G-NLL + min ----
        start_t = time.time()
        ans_total_min = generate_text_sentence_by_sentence(
            model=model,
            tokenizer=tokenizer,
            prompt=prompt,
            max_sentences=10,
            n_candidates=10,
            uncertainty_method="total",
            top_p=0.9,
            temperature=1.0,
            max_tokens_per_sentence=100,
            device=device,
            min_or_max="min"             # min
        )
        end_t = time.time()
        elapsed_total_min = end_t - start_t

        print("\n[3] total G-NLL + min:")
        print(ans_total_min)
        print(f"Elapsed time: {elapsed_total_min:.2f} seconds")

        log_str_list.append("\n[3] total G-NLL + min:")
        log_str_list.append(ans_total_min)
        log_str_list.append(f"Elapsed time: {elapsed_total_min:.2f} seconds")

        # ---- (4) normalized G-NLL + min ----
        start_t = time.time()
        ans_norm_min = generate_text_sentence_by_sentence(
            model=model,
            tokenizer=tokenizer,
            prompt=prompt,
            max_sentences=10,
            n_candidates=10,
            uncertainty_method="normalized",
            top_p=0.9,
            temperature=1.0,
            max_tokens_per_sentence=100,
            device=device,
            min_or_max="min"
        )
        end_t = time.time()
        elapsed_norm_min = end_t - start_t

        print("\n[4] normalized G-NLL + min:")
        print(ans_norm_min)
        print(f"Elapsed time: {elapsed_norm_min:.2f} seconds")

        log_str_list.append("\n[4] normalized G-NLL + min:")
        log_str_list.append(ans_norm_min)
        log_str_list.append(f"Elapsed time: {elapsed_norm_min:.2f} seconds")

        # ---- (5) Greedy Decoding ----
        start_t = time.time()
        ans_greedy = generate_text_sentence_by_sentence_greedy(
            model=model,
            tokenizer=tokenizer,
            prompt=prompt,
            max_sentences=10,
            max_tokens_per_sentence=100,
            device=device
        )
        end_t = time.time()
        elapsed_greedy = end_t - start_t

        print("\n[5] Greedy Decoding:")
        print(ans_greedy)
        print(f"Elapsed time: {elapsed_greedy:.2f} seconds")

        log_str_list.append("\n[5] Greedy Decoding:")
        log_str_list.append(ans_greedy)
        log_str_list.append(f"Elapsed time: {elapsed_greedy:.2f} seconds")

        # 최종 구분선
        print("============================\n")
        log_str_list.append("\n============================\n")

        # ---------------------------------------------------------
        # 4) 각 결과를 파일로 기록
        # ---------------------------------------------------------
        with open(output_file, "a", encoding="utf-8") as f:
            for line in log_str_list:
                f.write(line + "\n")
