In [None]:
# Load model directly
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForCausalLM

processor = AutoProcessor.from_pretrained("openai/whisper-large-v2")
whisper = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v2")
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B")
llama = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-1B")

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
import torch
from copy import deepcopy

def modify_llama_blocks(model, num_blocks_to_keep=2):
    """
    Llama 모델의 시작과 끝 부분에서 각각 지정된 수의 블록만 남기고 중간 블록들을 제거합니다.
    
    Args:
        model: Llama 모델
        num_blocks_to_keep: 시작과 끝에서 각각 유지할 블록의 수
    
    Returns:
        수정된 모델
    """
    # 모델 복사
    modified_model = deepcopy(model)
    
    # decoder layers 가져오기
    layers = modified_model.model.layers
    
    # 전체 블록 수
    total_blocks = len(layers)
    
    # 유지할 블록의 인덱스
    keep_indices = list(range(num_blocks_to_keep)) + list(range(total_blocks - num_blocks_to_keep, total_blocks))
    
    # 새로운 블록 리스트 생성
    new_layers = torch.nn.ModuleList([layers[i] for i in keep_indices])
    
    # 기존 layers를 새로운 것으로 교체
    modified_model.model.layers = new_layers
    
    return modified_model

# 사용 예시
modified_llama = modify_llama_blocks(llama, num_blocks_to_keep=2)


수정된 모델의 총 블록 수: 4


In [9]:
from transformers import AutoTokenizer
import torch

# 토크나이저 로드 및 설정
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B")
tokenizer.pad_token = tokenizer.eos_token  # EOS 토큰을 패딩 토큰으로 사용
tokenizer.padding_side = "left"  # 왼쪽에 패딩 추가

def generate_text(model, text, max_length=100):
    """
    주어진 텍스트에 대해 Llama 모델을 사용하여 텍스트를 생성합니다.
    
    Args:
        model: Llama 모델
        text: 입력 텍스트
        max_length: 생성할 최대 토큰 수
    
    Returns:
        생성된 텍스트
    """
    # 입력 텍스트를 토큰화
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    print("input size: ", inputs)
    # GPU가 있다면 GPU로 이동
    if torch.cuda.is_available():
        model = model.cuda()
        inputs = {k: v.cuda() for k, v in inputs.items()}
    
    print(inputs)
    # 모델을 평가 모드로 설정
    model.eval()
    
    # 텍스트 생성
    with torch.no_grad():
        output_sequences = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=max_length,
            temperature=0.7,
            top_p=0.9,
            top_k=50,
            repetition_penalty=1.2,
            do_sample=True,
            num_return_sequences=1,
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id,
            bos_token_id=tokenizer.bos_token_id,  # 시작 토큰 ID 추가
        )
    
    # 생성된 텍스트 디코딩
    generated_text = tokenizer.decode(output_sequences[0], skip_special_tokens=True)
    
    return generated_text

# 모델 사용 전 패딩 토큰 설정
modified_llama.config.pad_token_id = tokenizer.pad_token_id
modified_llama.resize_token_embeddings(len(tokenizer))

# 사용 예시
input_text = "오늘의 날씨는"
generated_text = generate_text(modified_llama, input_text)
print(f"입력: {input_text}")
print(f"출력: {generated_text}")

input size:  {'input_ids': tensor([[128000,  58368, 105622,  21028, 105605, 107497,  16969]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1]])}
{'input_ids': tensor([[128000,  58368, 105622,  21028, 105605, 107497,  16969]],
       device='cuda:0'), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1]], device='cuda:0')}
입력: 오늘의 날씨는
출력: 오늘의 날씨는 “” ”’’jk ‘ '”.uhl),

 [...]



tr you ### \( $$﻿ _(

 ``` [_...

 ^{}'''">
 `_”’ ’u ^{[ 

﻿
mart[^._

>
 '\use....

 Katz.'''
 жеятьiiihe '' ”blo ""

 ""''ii‬се')]h _(eedeedeemej'^"...limeii'))').'d)”’me "_’s‘ ’bloodmodedsdevdevdevb?[millionmillionmillion billion billion


In [10]:
import torch
import torch.nn as nn

class WhisperLlamaASR(nn.Module):
    def __init__(self, whisper_encoder, llama_decoder):
        super().__init__()
        self.encoder = whisper_encoder
        self.decoder = llama_decoder
        
        self.encoder_dim = 1280
        self.decoder_dim = 2048
        
        self.bridge = nn.Sequential(
            nn.Linear(self.encoder_dim, self.decoder_dim),
            nn.LayerNorm(self.decoder_dim),
            nn.ReLU(),
            nn.Dropout(0.1)
        )
        
        # 모든 컴포넌트를 같은 디바이스로 이동
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.to(device)
        
    def forward(self, input_features, decoder_input_ids, decoder_attention_mask=None):
        # 입력을 모델과 같은 디바이스로 이동
        device = next(self.parameters()).device
        input_features = input_features.to(device)
        decoder_input_ids = decoder_input_ids.to(device)
        if decoder_attention_mask is not None:
            decoder_attention_mask = decoder_attention_mask.to(device)
            
        # Whisper encoder를 통과
        encoder_outputs = self.encoder(
            input_features=input_features,
            return_dict=True
        )
        
        encoder_hidden_states = encoder_outputs.last_hidden_state
        bridge_hidden_states = self.bridge(encoder_hidden_states)
        
        # Llama decoder에 전달
        decoder_outputs = self.decoder(
            input_ids=decoder_input_ids,
            attention_mask=decoder_attention_mask,
            # encoder_hidden_states=bridge_hidden_states,  # cross-attention이 없으므로 제거
            use_cache=False,
            return_dict=True
        )
        
        return decoder_outputs

def create_asr_model(whisper_encoder, llama_decoder):
    # GPU가 있으면 GPU로 이동
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    whisper_encoder = whisper_encoder.to(device)
    llama_decoder = llama_decoder.to(device)
    
    model = WhisperLlamaASR(whisper_encoder, llama_decoder)
    return model.to(device)

def process_batch(model, input_features, decoder_input_ids, decoder_attention_mask=None):
    # 모델의 디바이스 확인
    device = next(model.parameters()).device
    
    # 입력 데이터를 모델과 같은 디바이스로 이동
    input_features = input_features.to(device)
    decoder_input_ids = decoder_input_ids.to(device)
    if decoder_attention_mask is not None:
        decoder_attention_mask = decoder_attention_mask.to(device)
    
    outputs = model(
        input_features=input_features,
        decoder_input_ids=decoder_input_ids,
        decoder_attention_mask=decoder_attention_mask
    )
    return outputs

# 사용 예시
whisper = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v2")
whisper_encoder = whisper.model.encoder
asr_model = create_asr_model(whisper_encoder, modified_llama)

# 예시 입력 데이터 생성 (CPU에서 생성)
input_features = torch.randn(1, 80, 3000)
decoder_input_ids = torch.randint(0, 32000, (1, 100))

# 모델 실행 (process_batch 내에서 GPU로 이동)
outputs = process_batch(asr_model, input_features, decoder_input_ids)

In [11]:
from transformers import AutoTokenizer
import torch
import torch.nn.functional as F

def decode_asr_output(outputs, tokenizer, skip_special_tokens=True):
    """
    ASR 모델의 출력을 텍스트로 변환합니다.
    
    Args:
        outputs: 모델의 출력 (CausalLMOutputWithPast)
        tokenizer: Llama 토크나이저
        skip_special_tokens: 특수 토큰 스킵 여부
    
    Returns:
        str: 디코딩된 텍스트
    """
    # logits에서 가장 높은 확률을 가진 토큰 인덱스 선택
    predictions = torch.argmax(outputs.logits, dim=-1)
    
    # 토큰을 텍스트로 디코딩
    decoded_text = tokenizer.decode(predictions[0], skip_special_tokens=skip_special_tokens)
    
    return decoded_text


tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B")
tokenizer.pad_token = tokenizer.eos_token

# 직접 디코딩
text = decode_asr_output(outputs, tokenizer)
print("디코딩된 텍스트:", text)


디코딩된 텍스트:  of\\\\ mir _Trat atroySw Securitiesas Outof _of ThB "_"kyhEcr.svg " " Seymouryas sPR" " acido SE" mir"es fuel Tower 'k'’ 'k "”u ‘”” " " " "" ”” " "””"”, " ”...

deskoub...

'd " “ audiences  "jose uvelu " #####kj’ sek “ "  "neenc kes “


In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from transformers import WhisperProcessor
import torch.optim as optim
from tqdm import tqdm
import wandb
import numpy as np

class LibriSpeechDataset(Dataset):
    def __init__(self, split="train.clean.100", processor=None):
        self.dataset = load_dataset("librispeech_asr", split=split)
        self.processor = processor
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        # 오디오 데이터 로드
        audio = self.dataset[idx]["audio"]
        text = self.dataset[idx]["text"]
        
        # Whisper 프로세서로 오디오 특성 추출
        input_features = self.processor(
            audio["array"],
            sampling_rate=16000,
            return_tensors="pt"
        ).input_features.squeeze(0)
        
        # 텍스트 토큰화
        labels = self.processor.tokenizer(
            text,
            padding="max_length",
            max_length=128,
            truncation=True,
            return_tensors="pt"
        )
        
        return {
            "input_features": input_features,
            "decoder_input_ids": labels["input_ids"].squeeze(0),
            "decoder_attention_mask": labels["attention_mask"].squeeze(0),
            "labels": labels["input_ids"].squeeze(0)
        }

def collate_fn(batch):
    # 배치 내의 최대 길이에 맞춰 패딩
    max_audio_len = max(x["input_features"].shape[1] for x in batch)
    max_text_len = max(x["decoder_input_ids"].shape[0] for x in batch)
    
    input_features = []
    decoder_input_ids = []
    decoder_attention_mask = []
    labels = []
    
    for item in batch:
        # 오디오 특성 패딩
        padded_audio = torch.nn.functional.pad(
            item["input_features"],
            (0, max_audio_len - item["input_features"].shape[1])
        )
        input_features.append(padded_audio)
        
        # 텍스트 패딩
        padded_text = torch.nn.functional.pad(
            item["decoder_input_ids"],
            (0, max_text_len - item["decoder_input_ids"].shape[0]),
            value=processor.tokenizer.pad_token_id
        )
        decoder_input_ids.append(padded_text)
        
        # 어텐션 마스크 패딩
        padded_mask = torch.nn.functional.pad(
            item["decoder_attention_mask"],
            (0, max_text_len - item["decoder_attention_mask"].shape[0])
        )
        decoder_attention_mask.append(padded_mask)
        
        # 라벨 패딩
        padded_labels = torch.nn.functional.pad(
            item["labels"],
            (0, max_text_len - item["labels"].shape[0]),
            value=-100  # loss 계산 시 무시될 패딩 값
        )
        labels.append(padded_labels)
    
    return {
        "input_features": torch.stack(input_features),
        "decoder_input_ids": torch.stack(decoder_input_ids),
        "decoder_attention_mask": torch.stack(decoder_attention_mask),
        "labels": torch.stack(labels)
    }

def train_epoch(model, train_loader, optimizer, device, epoch):
    model.train()
    total_loss = 0
    
    progress_bar = tqdm(train_loader, desc=f'Epoch {epoch}')
    for batch in progress_bar:
        optimizer.zero_grad()
        
        # 데이터를 디바이스로 이동
        input_features = batch["input_features"].to(device)
        decoder_input_ids = batch["decoder_input_ids"].to(device)
        decoder_attention_mask = batch["decoder_attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        # 모델 forward pass
        outputs = model(
            input_features=input_features,
            decoder_input_ids=decoder_input_ids,
            decoder_attention_mask=decoder_attention_mask
        )
        
        # Loss 계산
        loss = F.cross_entropy(
            outputs.logits.view(-1, outputs.logits.size(-1)),
            labels.view(-1),
            ignore_index=-100
        )
        
        # Backward pass 및 최적화
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()})
        
        # WandB에 로그 기록
        wandb.log({
            "batch_loss": loss.item(),
            "epoch": epoch
        })
    
    avg_loss = total_loss / len(train_loader)
    return avg_loss

def evaluate(model, eval_loader, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for batch in tqdm(eval_loader, desc='Evaluating'):
            input_features = batch["input_features"].to(device)
            decoder_input_ids = batch["decoder_input_ids"].to(device)
            decoder_attention_mask = batch["decoder_attention_mask"].to(device)
            labels = batch["labels"].to(device)
            
            outputs = model(
                input_features=input_features,
                decoder_input_ids=decoder_input_ids,
                decoder_attention_mask=decoder_attention_mask
            )
            
            loss = F.cross_entropy(
                outputs.logits.view(-1, outputs.logits.size(-1)),
                labels.view(-1),
                ignore_index=-100
            )
            
            total_loss += loss.item()
    
    avg_loss = total_loss / len(eval_loader)
    return avg_loss

def main():
    # WandB 초기화
    wandb.init(project="whisper-llama-asr")
    
    # 모델과 프로세서 준비
    processor = WhisperProcessor.from_pretrained("openai/whisper-large-v2")
    processor.tokenizer.pad_token = processor.tokenizer.eos_token
    
    """# 데이터셋 및 데이터로더 준비
    train_dataset = LibriSpeechDataset(split="train.clean.10", processor=processor)
    eval_dataset = LibriSpeechDataset(split="validation", processor=processor)
    
    train_loader = DataLoader(
        train_dataset,
        batch_size=8,
        shuffle=True,
        collate_fn=collate_fn,
        num_workers=4
    )
    
    eval_loader = DataLoader(
        eval_dataset,
        batch_size=8,
        shuffle=False,
        collate_fn=collate_fn,
        num_workers=4
    )"""
    
    # 모델 준비
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = create_asr_model(whisper_encoder, modified_llama)
    model = model.to(device)
    
    # 옵티마이저 설정
    optimizer = optim.AdamW(model.parameters(), lr=2e-5)
    
    # 학습 설정
    num_epochs = 10
    best_eval_loss = float('inf')
    
    # 학습 루프
    for epoch in range(num_epochs):
        # 훈련
        train_loss = train_epoch(model, train_loader, optimizer, device, epoch)
        
        # 평가
        eval_loss = evaluate(model, eval_loader, device)
        
        # WandB에 로그 기록
        wandb.log({
            "train_loss": train_loss,
            "eval_loss": eval_loss,
            "epoch": epoch
        })
        
        # 모델 저장
        if eval_loss < best_eval_loss:
            best_eval_loss = eval_loss
            torch.save(model.state_dict(), f'best_model_epoch_{epoch}.pt')
        
        print(f'Epoch {epoch}: Train Loss = {train_loss:.4f}, Eval Loss = {eval_loss:.4f}')

if __name__ == "__main__":
    main()

NameError: name 'create_asr_model' is not defined