<a href="https://colab.research.google.com/github/YoungsikMoon/FORS/blob/main/Whisper_Fine_Thank_you.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Whisper Fine Tuning

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
# 압축 풀 경로로 이동
%cd /content/drive/MyDrive/whisper/[원천]3.스튜디오
!pwd

In [None]:
# 해당 경로에 압축 풀기 (압축파일 경로 찍어줌)
# 압축파일을 먼저 올려놓고 여기서 풀어주는게 훨씬 빠름

# 잘 못 누르면 데이터 꼬이니까 꼭 주석처리 해둘 것
# !unzip -qq "/content/drive/MyDrive/whisper/[원천]3.스튜디오.zip"

In [None]:
# 필수 패키지 설치

!pip install datasets>=2.6.1
!pip install git+https://github.com/huggingface/transformers
!pip install evaluate>=0.30
!pip install jiwer
!pip install accelerate -U
!pip install transformers[torch]

## 오디오 데이터셋 베이스 정보 리스트 만들기


- 필수 정보 : 오디오 데이터 파일 경로, 발화 텍스트


---


- 추가 정보 : json 파일 경로, 오디오 시간 정보
- 최종 데이터셋으로 만들기 전 단계
- 데이터 검증 및 필수 정보를 담은 리스트 추출 단계
- json 파일 열어서 내용 가져오는게 너무 오래 걸려서 로컬에서 만들어와서 편집하기로함


In [None]:
# 오디오파일 경로 + 텍스트 매칭한 리스트 만들기

# 폴더와 파일 리스트를 뽑는데 유용한 라이브러리
# os 패키지로도 동일하게 수행 가능하지만 파일 리스트를 편하게 뽑는데 좀 더 특화돼있음
# 특히 glob 패키지의 glob 함수는 하위 폴더명이 아닌 하위 폴더의 전체 경로를 리스트로 반환해줘서 아주 편함
from glob import glob
from tqdm import tqdm
import json


# 문제는 폴더가 너무 많고 파일명이 복잡하다는 점
# 오디오파일과 json 파일 매칭이 제대로 되지 않을 경우 학습의미가 없음
    # ==> 데이터 프레임 만들 때 정확히 파일 이름이 일치하는 경우에만 매칭되도록 전처리 필요

# 오디오 파일 최상위 경로
# 폴더명, 파일명에 대괄호 들어있으면 인식 못함.. 수정할 것(수정 어려우면 os.listdir 사용)
audio_file_path = "/content/drive/MyDrive/whisper/원천_3_스튜디오/*"

# json 파일 최상위 경로
json_file_path = "/content/drive/MyDrive/whisper/라벨_3_스튜디오/*"


# 오디오 파일 구조 : [원천]3.스튜디오(1ea) - 노인남여_xxx폴더s(16ea) - 노인남녀_xxx.wav(다수)
# json 파일 구조 : [라벨]3.스튜디오(1ea) - 노인남여_xxx폴더s(16ea) - 노인남녀_xxx.json(다수)

# 원천, 라벨의 모든 폴더와 파일 이름이 같고, 같은 순서로 정렬됐는지 무결성 검사를 먼저 진행
# 진행하면서 파일 경로 리스트를 먼저 만들어둠

# 폴더 경로 리스트 정렬
audio_folder_path_list = sorted(glob(audio_file_path))
json_folder_path_list = sorted(glob(json_file_path))

# 폴더, 파일 갯수 매칭 확인부터. 갯수 다르면 다른 작업 시작할 필요가 없음.
# 갯수 하나라도 다르면 데이터 사용 불가.
if len(audio_folder_path_list) == len(json_folder_path_list):
    for audio_folder_path, json_folder_path in zip(audio_folder_path_list, json_folder_path_list):

        # 해당 폴더에 대한 파일 갯수가 같다면 계속 진행
        if len(glob(audio_folder_path + "/*")) == len(glob(json_folder_path + "/*")):
            continue

        # 다르면 에러 발생
        else:
            print(f"파일 갯수 다름 : {audio_folder_path}, {json_folder_path}")
            raise Exception("파일 무결성 검증 실패")

    print("데이터 갯수 무결성 검증 완료")

else:
    print("폴더 갯수 다름")
    raise Exception("파일 무결성 검증 실패")

# 갯수 검증 끝났으면 다음 단계 (폴더, 파일 이름 확인 후 경로 정보 리스트로 추출)
# 오디오, json 파일 경로 리스트
audio_file_path_list = []
json_file_path_list = []


# 첫 번째 루프 : 각 폴더 경로 꺼내옴
for audio_folder_path, json_folder_path in zip(audio_folder_path_list, json_folder_path_list):

    # 정렬된 파일 리스트 만들기
    audio_file_path_list_sorted = sorted(glob(audio_folder_path + "/*"))
    json_file_path_list_sorted = sorted(glob(json_folder_path + "/*"))

    # 두 번째 루프 : 각 파일
    for audio_file_path, json_file_path in zip(audio_file_path_list_sorted, json_file_path_list_sorted):

        # 확장자 제외 파일 이름이 같은지 확인
        if audio_file_path.split("/")[-1].split(".")[0] == json_file_path.split("/")[-1].split(".")[0]:

            # 이름 같으면 해당 파일 경로를 각 리스트에 넣어줌
            audio_file_path_list.append(audio_file_path)
            json_file_path_list.append(json_file_path)

        # 이름이 다르다면 데이터 매칭이 꼬여있다는 의미이므로 에러 발생 후 데이터 확인 필요
        else:
            print(f"파일 이름 불일치 : {audio_file_path}, {json_file_path}")
            raise Exception("파일 무결성 검증 실패")

print(f"폴더 및 파일 이름 매칭 검증 및 리스트 생성 완료, 갯수 : {len(audio_file_path_list)} : {len(json_file_path_list)}")


# 완성된 리스트에서 json 파일 안에 text 정보만 빼서 새로운 리스트 생성
# 녹음시간 통계를 보고 싶어서 시간정보도 추출
text_list = []
time_list = []

for json_file_path in json_file_path_list:
    with open (json_file_path, "r") as f:
        dict = json.load(f)
        text_list.append(dict["발화정보"]["stt"])
        time_list.append(dict["발화정보"]["recrdTime"])

# 갯수 확인
print(len(text_list), len(time_list))

#### - 최종 데이터 프레임 생성

- 코랩 json 읽기 느려서 로컬에서 작업한 파일 가져와서 편집
- 오디오 경로, json 경로, 발화텍스트, 녹음시간

In [None]:
# 로컬에서 작업한 내용 코랩용으로 만들어서 다시 저장해주기
import pandas as pd
df = pd.read_csv("/content/drive/MyDrive/whisper/audio_path.csv", encoding = "utf-8")

# 루트 위치 바꿔주기
df["audio_path"] = df["audio_path"].str.replace("E:\\02.공부\\02.코딩\\01.Python\\01.Alphaco\\02.코드\\03.프로젝트\\02.자연어처리(음성인식)\\02.위스퍼파인튜닝\\01.data\\01.노인발화자유음성(스튜디오3)\\", "/content/drive/MyDrive/whisper/")
# 로컬에서는 \\, 코랩에서는 /로 구분. 바꿔주기
df["audio_path"] = df["audio_path"].str.replace("\\", "/")

# 새로 저장
df.to_csv("/content/drive/MyDrive/whisper/audio_path_colab.csv", index = False)

#### - 데이터 프레임 불러오기 및 녹음파일 테스트

In [None]:
import pandas as pd
df = pd.read_csv("/content/drive/MyDrive/whisper/audio_path_colab.csv", encoding = "utf-8")

# 경로 잘 되는지 테스트
from IPython.display import Audio
Audio(df.iloc[0, 0], rate=16000)

## DatasetDict 데이터셋 만들기



- 위에서 만들어진 "오디오파일 경로", "발화 텍스트" 리스트를 이용해 최종 자료형 생성

- DatasetDict 자료형이 위스퍼 최종 학습을 위한 자료형으로 사용됨
- 위스퍼 인풋 음성 파일의 Sample rate는 16khz로 고정
- Train, Vaildation, Test 세 개로 나눠진 최종 DatasetDict 자료형 생성


In [None]:
# 이제부터 시작..

# 뽑은 파일 경로 데이터를 위스퍼가 요구하는 데이터셋 형태로 변환
# 최종적으로 DatasetDict 객체로 만들어줘야함
from datasets import Dataset, DatasetDict

# 오디오 파일의 sampling rate를 조절해줌
# 위스퍼에서는 무조건 16khz(16,000hz)로 맞춰줘야함.
from datasets import Audio

# 오디오파일 경로와 텍스트를 딕셔너리 형태로 넣어줌
# Dataset에서 제공하는 cast_column 함수에서 audio의 value값에 Audio함수를 적용해 sampling_rate를 조절해줌 (cast_column()은 일종의 map 같은 함수인듯)
ds = Dataset.from_dict({"audio" : df["audio_path"].tolist(), "transcripts" : df["text"].tolist()}).cast_column("audio", Audio(sampling_rate = 16000))

# 위의 ds는 전체 데이터
# 전체 중에서 validation 데이터와 test 데이터를 분할해줌

# 훈련데이터(0.8), validation(0.2) 비율로 나눠줌
ds_train_valid = ds.train_test_split(test_size = 0.2)

# validation 중에서 절반을 다시 테스트 데이터로 나눠줌
# 데이터셋은 딕셔너리 형태로 데이터를 쪼개서 보관하니까 컬럼명으로 찾아주면 됨 (train, test)
ds_test = ds_train_valid["test"].train_test_split(test_size = 0.5)

# train, validation, test 세 가지로 쪼개진 데이터를 DatasetDict 형태로 한 번에 묶어주면 데이터 가공 끝
# 구조가 약간 헷갈릴 수 있음
    # 처음 만든 ds_train_valid의 ["train"]은 0.8의 훈련 데이터를 의미
    # ds_train_valid의 0.2인 ["test"]를 다시 반반씩 쪼개서 하나는 validation, 하나는 test로 사용하기로 함
        # 따라서 ds_test의 ["train"]과 ["test"]가 각각 validation, test 자료가 되는 구조 (비율이 반반이니까 순서는 상관없음)
datasets = DatasetDict({"train" : ds_train_valid["train"], "valid" : ds_test["train"], "test" : ds_test["test"]})

# 허깅페이스에 업로드해서 보관하는 데이터셋은 위의 datasets임.
# 구글드라이브에는 이 객체 자체를 저장 못하니까 허깅페이스에서 지원해주는듯
# 나중에 체크포인트 등도 지원해주니까 업로드해서 쓰는게 좋을 것 같음

datasets["train"]

#### 전처리 완료된 최종 학습용 datasetDict 만들기


- 패딩, mel-log로 변환된 input_features 컬럼 / 인덱스로 변환된 labels 컬럼


---

- 기존 컬럼 (audio, transcripts)는 버리고 전처리 완료된 datasetDict 객체를 생성
- 전처리 작업 약 40분 넘게 소요 예정. 객체 저장 필수
- 허깅 페이스는 매 번 토큰 입력해줘야함으로 구글 드라이브에 저장
- 코랩 메모리 아웃나서 로컬에서 진행..=ㅅ=
    - 로컬에서는 10분 걸림..-ㅅ-
    - 코랩프로 결제해도 2시간 걸리는데..? 이게 맞는거임..???
    - 아무래도 구글 드라이브를 쓰다보니 I/O가 엄청나게 느린듯....
    - 전처리는 로컬이 짱인듯.. 근데 이건 또 드라이브에 업로드하는 시간이.....
    - GPU 빵빵하게 달린 서버 한 대 쓰는게 최고다.. 코랩프로+도 그닥 소용없을 듯..

In [None]:
# 오디오파일 전처리(학습전), 후처리(학습후) 해주는 클래스
    # WhisperFeatureExtractor 클래스와 WhisperTokenizer 클래스를 상속받아 하나로 쓸 수 있도록 합쳐둔 클래스
    # 위 두 개 클래스는 따로 import 하지 않아도 됨
from transformers import WhisperProcessor

# 객체 생성
# 파라미터 : 베이스 모델(tiny, small, medium 등), 언어종류, 태스크
    # transcribe는 해당 언어로 전사를 뜻함. 바로 영어로 번역하려면 다른 태스크 모드(translate) 사용
processor = WhisperProcessor.from_pretrained("openai/whisper-base", language="Korean", task="transcribe")


# 데이터 전처리 작업을 위한 함수
def prepare_dataset(batch):
    # 오디오 파일을 16kHz로 로드
    audio = batch["audio"]

    # input audio array로부터 sample_rate값 적용해서 log-Mel spectrogram 변환 및 30초로 패딩작업
    # 오디오가 다 짧은데 꼭 30초로 패딩해야하는가에 대한 의문. 줄이는 방법이 안보임. 못줄인다네..-ㅅ-
    # input_features[0] : input_features, input_features[1] : attention_mask (위스퍼에서는 30초로 일괄 패딩해버리니까 불필요)
    batch["input_features"] = processor.feature_extractor(audio["array"], sampling_rate = audio["sampling_rate"]).input_features[0]

    # target text를 label ids로 변환
    batch["labels"] = processor.tokenizer(batch["transcripts"]).input_ids
    return batch


# 데이터 전처리 함수를 데이터셋 전체에 적용
# num_proc : 몇 개의 CPU 코어를 사용할 지 결정 (2개부터 병렬처리)
    # 코랩 CPU 코어 확인 : !grep "cpu cores" /proc/cpuinfo | tail -1 ==> 1코어밖에 안줌..

# remove_columns는 반환할 때 해당 컬럼은 제외한 객체를 반환한다는 의미
    # datasets.column_names["train"] : ["audio", "transcripts"] (오디오, 발화텍스트)
    # 즉, "audio"에 들어있는 path, array, sampling_rate 정보를 써서 실제 log-Mel로 전처리된 "input_features"를 생성
    # "transcripts"에 들어있는 한국어는 인덱스로 변환시켜 전처리된 "labels"를 생성
        # 전처리된 데이터를 가진 "input_features"와 "labels"만 가진 새로운 datasetDict 타입의 객체를 반환 (나머지 컬럼은 제거된)
        # 굳이 remove_columns() 옵션을 사용하지 않고 그냥 함수 내에서 작업 후 해당 컬럼을 지워버려도 무방할 듯

low_call_voices = datasets.map(prepare_dataset, remove_columns = datasets.column_names["train"], num_proc=1)

#### - 최종 전처리된 datasetDict 객체 저장하고 불러오기



- 이제 데이터 전처리는 끝
- 이 객체만 있으면 코랩 말고 다른 곳에서도 학습시킬 수 있음
- 이후 프로세스는 TPU 또는 GPU가 필요
    - 근데 GPU가 없네.. GPU 내놔라...=ㅅ=

- 문제는 최종 전처리된 객체가 3G -> 16G로 용량이 늘어났다는 것..
    - 5~10초짜리 음성파일이 대부분인데 30초로 일괄 패딩해버린 탓..
    - 패딩 길이 조절하는 방법은 없다고함
        - 그냥 30초 내외 음성파일 쓰는게 가장 효율적일 듯

In [None]:
# 전처리 완료된 datasetDict 객체 디스크(구글 드라이브)에 저장하기 (폴더로 저장됨)
    # 드라이브에 15기가 공간 없음. 그냥 content에 저장..
# 실제로는 수행안함. 로컬에서 해서 드라이브에 가져다둠. 딥빡..
low_call_voices.save_to_disk("/content/drive/MyDrive/whisper/low_call_voice")

In [None]:
# 전처리 완료된 datasetDict 객체 디스크(구글 드라이브)에서 불러오기
from datasets import load_from_disk
from datasets import Dataset, DatasetDict
from transformers import WhisperProcessor

# 여기서부터 새로 시작.
processor = WhisperProcessor.from_pretrained("openai/whisper-base", language="Korean", task="transcribe")
low_call_voices_prepreocessed = load_from_disk("/content/drive/MyDrive/whisper/low_call_voice")
low_call_voices_prepreocessed

## 데이터 학습 (Fine-Tuning) 진행


In [None]:
- 전처리된 데이터로 학습진행 시작
- 여기서부터 GPU/TPU 필수
- GPU/TPU 주는 캐글 노트북으로 건너가서 진행
    - 아래 코드는 코랩에서는 캐글에서 수행할 것..
    - 근데 캐글에 16기가 파일 안올라가서 실패..

#### 1. Data Collator


- 전처리한 데이터를 모델에 입력할 수 있는 PyTorch 텐서 형태로 변환하는 작업
- 이 때 패딩이 되지 않은 타겟(labels = 발화데이터)도 같이 패딩처리해줌

In [None]:
import torch
from dataclasses import dataclass
from typing import Any, Dict, List, Union
from transformers import WhisperProcessor

# 전처리된 데이터를 텐서 형태로 변환
# 제일 이해 안가는 코드..

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:

    processor: Any

    def __call__(self, features : List[Dict[str, Union[List[int], torch.Tednsor]]]) -> Dict[str, torch.Tensor]:

        # featurs는 최종 datasetDict 자료 (3개로 이뤄져있음)
            # train : input_features, labels
            # valid : input_features, labels
            # test : input_features, labels

        # 인풋 데이터(mel-log로 변환된 오디오 파일)를 토치 텐서로 변환
        input_features = [{"input_features": feature["input_features"]} for feature in features]

        # input_feaures를 패딩. 이미 패딩 돼 있는데 왜 또하는지는 잘 모르겠음.
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # 라벨 데이터 (정수 인코딩된 발화데이터)를 토치 텐서로 변환
        # ex) train이 가진 ["input_features"] 컬럼을 딕셔너리 구조를 가진 리스트 안에다가 넣어줌
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        # 라벨 데이터 패딩 적용
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # 패딩 토큰을 -100으로 치환해 loss 계산 과정에서 무시되도록 함
        # 이미 패딩 토큰이라는 것 자체가 무시되도록 돼 있을텐데 왜 하는지를 잘 모르겠음
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # 이전 토크나이즈 과정에서 bos 토큰이 추가되었다면 bos 토큰을 잘라냄
        # 이 코드는 뭔지 잘 모르겠음..
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch


# 데이터 콜레이터 초기화
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

#### 2. Evaluaion Metrics


- 검증 데이터셋에 사용할 검증 매트릭스 정의
- 한국어의 경우 WER 보다 CER이 더 적합

In [None]:
import evaluate

metric = evaluate.load('cer')

def compute_metrics(pred):

    # 예측값
    pred_ids = pred.predictions

    # 실제값
    label_ids = pred.label_ids

    # 패딩된 토큰을 올바르게 무시하기 위해 -100을 다시 pad_token으로 치환
    # 밑에 kip_special_tokens=True가 적용될 수 있도록 함
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # 정수 인덱스를 실제 문자열로 디코딩
    # metrics 계산 시 special token들을 빼고 계산하도록 설정
    pred_str = processor.tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    # CER 계산
    cer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"cer": cer}

#### 3. Load a Pre-Trained Checkpoint


- pre_trained model : 이미 학습된 위스퍼의 기본 모델을 뜻함
- tiny, small, medium, large 등의 학습된 모델 종류를 지정해서 모델을 로드

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base")

# 한국어 고정이기 때문에 언어를 고정해주는 것이 좋음
model.generation_config.language = "korean"

# 한국어 전사 태스크임을 명시
# 프로세서에서 명시해줬지만 모델에서도 다시 명시해주는게 좋은 듯
model.generation_config.task = "transcribe"

# 디폴트값인 any는 디코더가 다국어에 맞는 토큰을 자동으로 찾도록 함
# 한국어만 보면 되므로 None으로 지정해서 좀 더 정확성을 올려줌
model.config.forced_decoder_ids = None

# 문장 생성 중 억제되는 토큰이 없도록 리스트를 비워줌
# 정확한 의미는 모르겠음..
# suppress_tokens는 일종의 불용어 사전 같은 용도
model.config.suppress_tokens = []

####4. Define the Training Arguments

- 최종 학습을 위한 파라미터 설정
- 에포크 횟수, 모델 저장 경로 등등

In [None]:
from transformers import Seq2SeqTrainingArguments
from transformers import Seq2SeqTrainer


# 파라미터 설정
# 너무 많다..
training_args = Seq2SeqTrainingArguments(
    output_dir="/content/drive/MyDrive/whisper/save_model",  # 모델의 최종 결과(가중치 테이블)이 저장될 곳
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # 배치 크기가 2배 감소할 때마다 2배씩 증가
    learning_rate=1e-5,
    warmup_steps=500,
    num_train_epochs = 10,
    # max_steps=4000,  # epoch 대신 설정, Test할 때만 사용하는 파라미터
    gradient_checkpointing=True,
    fp16=True, # Cuda 사용 못하면 False로 둬야함
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=1000,
    eval_steps=1000,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="cer",  # 한국어의 경우 'wer'보다는 'cer'이 더 적합
    greater_is_better=False,
    push_to_hub=False, # 허브에 업로드할지 여부
)

# 트레이너 설정
# 위에서 나온 모든 객체들을 이용해서 트레이닝
trainer = Seq2SeqTrainer(
    args=training_args, # 위에서 설정한 파라미터 적용
    model=model,
    train_dataset=low_call_voices_prepreocessed["train"],
    eval_dataset=low_call_voices_prepreocessed["valid"],  # or "test"
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

####5. 학습 진행

- 학습 진행
- 진행 후 바로 저장
- 리소스가 있어야 하지..

In [None]:
# GPU 탕진잼!!
trainer.train()

# 모델, 프로세서 저장 (토크나이저는 프로세서 안에 있으니까 따로 저장안함)
trainer.save_pretrained("/content/drive/MyDrive/whisper/save_model/")
processor.save_pretrained("/content/drive/MyDrive/whisper/save_model/")

## 평가진행



- 위 과정은 train, vaild를 이용한 훈련과 검증 과정
- 평가는 Test를 사용해 진행

In [None]:
from transformers import Seq2SeqTrainer

# 저장한 모델과 프로세서 로드
model = WhisperForConditionalGeneration.from_pretrained("/content/drive/MyDrive/whisper/save_model/")
processor = WhisperProcessor.from_pretrained("/content/drive/MyDrive/whisper/save_model/")

In [None]:
# 트레이너 셋팅. training_args는 훈련에서 사용한 것과 동일하게 사용
# 여기서 train은 왜 집어넣는걸까..? 필요없는데..
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=low_call_voices_prepreocessed["train"],
    eval_dataset=low_call_voices_prepreocessed["test"],  # for evaluation(not validation)
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

# 평가 진행
trainer.evaluate()

In [None]:
from transformers import pipeline
import gradio as gr

# 저장된 모델 불러와서 파이프라인에 넣어주면 됨
pipe = pipeline(model="")

# 실제 사용 메소드는 이거 3줄이면 됨. 물론 오디오 전처리 코드는 따로.
def transcribe(audio):

    # 파이프라인에 오디오파일 넣어주면 모델이 텍스트로 변환해서 반환해줌
    text = pipe(audio)["text"]
    return text

# 녹음하자마자 음성 파일로 따서 모델에 전달하고 싶다면 그라디오 사용
iface = gr.Interface(
    fn=transcribe,
    inputs=gr.Audio(source="microphone", type="filepath"),
    outputs="text",
    title="Whisper Small Hindi",
    description="Realtime demo for Hindi speech recognition using a fine-tuned Whisper small model.",
)

iface.launch()

# Faster Whisper Fine Tuning

Whisper 모델을 더 빠르고 효율적으로 사용하기 위한 기법.


---

Parameter Efficient Fine-Tuning (PEFT)
* 정의: PEFT는 모델의 일부 파라미터만 fine-tuning하는 기법입니다.
* 목적: 적은 수의 파라미터만 fine-tuning하여 모델 성능을 유지하면서도 메모리와 계산 비용을 줄이는 것이 목적입니다.
* 장점:
적은 수의 파라미터 fine-tuning: 전체 모델을 fine-tuning하는 것보다 적은 수의 파라미터만 fine-tuning합니다.
* 메모리 및 계산 비용 감소: 적은 수의 파라미터만 fine-tuning하므로 메모리와 계산 비용이 줄어듭니다.
* 기법:
Prompt Tuning
LoRA (Low-Rank Adaptation)
Prefix Tuning
Adapter Tuning

---



LoRA (Low Rank Adaptation)
* LoRA는 모델의 전체 파라미터를 fine-tuning하는 대신, 일부 파라미터만 업데이트하는 기법입니다.
* 이를 통해 모델 크기를 크게 줄이면서도 성능 저하를 최소화할 수 있습니다.
* LoRA는 GPT-3 175B 모델 대비 약 10,000배 적은 파라미터로 fine-tuning이 가능하다고 합니다.

---




Quantization (양자화)
* 정의: 양자화는 모델의 가중치와 활성화 함수 값을 낮은 비트 수로 표현하는 기술입니다.
* 목적: 모델 크기를 줄이고 추론 속도를 높이는 것이 목적입니다.
* 장점:
 * 모델 크기 감소: 낮은 비트 수로 표현하면 모델 크기가 줄어듭니다.
 * 추론 속도 향상: 낮은 비트 수 연산이 빠르기 때문에 추론 속도가 빨라집니다.
* 종류:
정적 양자화: 모델 학습 후 가중치를 양자화하는 방식
* 동적 양자화: 추론 시 입력 데이터에 따라 동적으로 양자화하는 방식

---

### 왜 Parameter Efficient Fine Tuning [PEFT](https://github.com/huggingface/peft)를 사용해야 되는가?


* PEFT
    * 효과적으로 parameter를 줄여서 fine tuning 속도 개선
    * 목적: 병목 현상을 해결
    * 접근법(예: 저수준 적응): 사전 훈련된 모델의 대부분의 매개변수를 동결시키면서 추가 모델 매개변수의 일부만 미세 조정하여 계산 및 저장 비용 크게 줄임
        * 대규모 모델의 전체 미세 조정 중 관찰되는 catastrophic forgetting 문제를 극복할 수 있음


---



* 모델 사이즈 증가로 fine tuning하는 것이 계산 복잡성 증가와 메모리 사용량 증가
    * 예를 들어, Whisper-large-v2 모델을 완전한 미세 조정을 위해 약 24GB의 GPU VRAM이 필요하며, 각 미세 조정된 모델은 약 7GB의 저장 공간을 필요함

    * 제한적인 환경에서 bottleneck 발생하고 원하는 결과를 얻기 힘듦



---



#### LoRA가 무엇인가?

* PEFT에서 여러 매개변수 효율적인 기술을 기본으로 제공
    * 그 중 하나인 Low Rank Adaptation (LoRA)
        * 사전 훈련된 모델 가중치를 동결하고 Transformer 아키텍처의 각 레이어에 훈련 가능한 랭크 분해 행렬을 삽입 (High Rank 즉 많은 연결이 되어 있는 것들보다 연결이 적은 Low Rank로 만들어서 계산량을 줄임)
            * Downstream 작업에 대한 훈련 가능한 매개변수 수가 크게 감소

---



#### 통계로 보는 PEFT 효과


* Full fine-tuning of Whisper-large-v2 checkpoint Vs. PEFT 적용 모델

    1. GPU VRAM이 8GB 미만인 환경에서 16억 개의 매개변수를 가진 모델을 미세 조정 🤯
    2. 훨씬 적은 수의 훈련 가능한 매개변수를 사용하여 거의 5배 더 큰 배치 크기를 사용 가능 📈
    3. 생성된 체크포인트는 원본 모델의 크기의 1%인 약 60MB 🚀

* 기존 🤗 transformers Whisper에서 변형이 많이 되지 않았음


---


    



### 환경설정

우리는 Whisper 모델을 미세 조정하기 위해 몇 가지 인기 있는 Python 패키지를 사용할 것입니다.

데이터 세트를 사용하여 훈련 데이터를 다운로드하고 준비하며 변환기를 사용하여 Whisper 모델을 로드하고 훈련할 것입니다.

또한 오디오 파일을 전처리하고 모델 성능을 평가하기 위해 librosa 패키지가 필요합니다.

마지막으로 PEFT, 비트앤바이트, 가속을 사용하여 LoRA로 모델을 준비하고 미세 조정합니다.

In [None]:
# PEFT 라이브러리 설치
!pip install -q git+https://github.com/huggingface/peft.git@main
# peft: 허깅페이스에서 개발한 Parameter Efficient Fine-Tuning 라이브러리
# 이 라이브러리를 통해 모델의 일부 파라미터만 fine-tuning할 수 있어 메모리와 계산 비용을 줄일 수 있습니다.

# 이렇게 필요한 라이브러리들을 설치하면 LoRA와 PEFT를 사용하여 Whisper 모델을 효율적으로 fine-tuning할 수 있습니다.
# 이를 통해 모델 크기를 줄이고, 추론 속도를 높이며, 메모리와 계산 비용을 절감할 수 있습니다.

In [None]:
# 필요한 라이브러리 설치
!pip install -q transformers datasets librosa evaluate jiwer gradio bitsandbytes==0.37 accelerate
# transformers: 허깅페이스의 언어 모델 라이브러리
# datasets: 허깅페이스의 데이터셋 라이브러리
# librosa: 오디오 처리 라이브러리
# evaluate: 모델 평가 도구
# jiwer: 음성 인식 성능 평가 지표
# gradio: 웹 기반 UI 라이브러리
# bitsandbytes==0.37: 비트 연산 가속화 라이브러리, 버전 0.37 사용
# accelerate: 모델 학습 가속화 라이브러리

이제 환경이 설정되었으므로 Colab에 적합한 GPU를 확보해 보겠습니다!

안타깝게도 Google Colab 무료 버전으로 좋은 GPU에 액세스하는 것이 훨씬 더 어려워지고 있습니다.

그러나 Google Colab Pro를 사용하면 V100 또는 P100 GPU를 할당하는 데 문제가 없습니다.

GPU를 얻으려면 런타임 -> 런타임 유형 변경을 클릭한 다음 하드웨어 가속기를 없음에서 GPU로 변경하세요.

GPU가 할당되었는지 확인하고 해당 사양을 볼 수 있습니다.

In [None]:
# NVIDIA GPU 정보를 확인하는 명령어 실행
gpu_info = !nvidia-smi

# 명령어 실행 결과를 문자열로 변환
gpu_info = '\n'.join(gpu_info)

# GPU 정보에 'failed'가 포함되어 있는지 확인
if gpu_info.find('failed') >= 0:
  # GPU에 연결되어 있지 않은 경우 출력
  print('Not connected to a GPU')
else:
  # GPU 정보 출력
  print(gpu_info)


# !nvidia-smi 명령어를 실행하여 NVIDIA GPU 정보를 확인합니다.
# 이 명령어는 GPU 드라이버 버전, CUDA 버전, GPU 온도, 팬 속도 등 다양한 정보를 제공합니다.
# gpu_info = '\n'.join(gpu_info) 코드를 통해 명령어 실행 결과를 문자열로 변환합니다.
# if gpu_info.find('failed') >= 0: 코드에서는 GPU 정보 문자열에 'failed'가 포함되어 있는지 확인합니다. 이는 GPU에 연결되어 있지 않은 경우 발생할 수 있습니다.
# 'failed'가 포함되어 있는 경우 "Not connected to a GPU"라는 메시지를 출력합니다.
# 'failed'가 포함되어 있지 않은 경우 GPU 정보를 출력합니다.

# 이 코드를 통해 사용자는 자신의 시스템에 NVIDIA GPU가 연결되어 있는지, 그리고 GPU 정보를 확인할 수 있습니다.
# 이는 딥러닝 모델 학습 및 추론 시 GPU 활용을 위해 필수적인 정보입니다.

Colab에서 제공한 GPU를 사용하도록 환경을 구성해 보겠습니다.

In [None]:
import os

# CUDA_VISIBLE_DEVICES 환경 변수를 "0"으로 설정
# 이는 CUDA를 사용하는 프로그램이 첫 번째 GPU(일반적으로 0부터 시작)만 사용하도록 지정하는 것입니다.
os.environ["CUDA_VISIBLE_DEVICES"] = "0"


# import os를 통해 운영 체제와 상호 작용하기 위한 os 모듈을 가져옵니다.
# os.environ["CUDA_VISIBLE_DEVICES"] = "0" 코드를 통해 CUDA_VISIBLE_DEVICES 환경 변수를 0으로 설정합니다.
# 이는 CUDA를 사용하는 프로그램이 첫 번째 GPU(일반적으로 0부터 시작)만 사용하도록 지정하는 것입니다.

# 이 코드를 통해 사용자는 CUDA를 사용하는 프로그램이 특정 GPU만을 활용하도록 설정할 수 있습니다.
# GPU가 여러 개인 경우, 이러한 설정을 통해 각 프로그램이 특정 GPU를 사용하도록 제어할 수 있습니다.

[Hugging Face Hub](https://huggingface.co/)에 모델 체크포인트를 직접 업로드하는 것이 좋습니다.
훈련하는 동안. 허브는 다음을 제공합니다.
- 통합 버전 제어: 학습 중에 모델 체크포인트가 손실되지 않도록 할 수 있습니다.
- Tensorboard 로그: 훈련 과정에서 중요한 측정항목을 추적합니다.
- 모델 카드: 모델이 수행하는 작업과 의도된 사용 사례를 문서화합니다.
- 커뮤니티: 커뮤니티와 쉽게 공유하고 협업할 수 있는 방법입니다!

노트북을 허브에 연결하는 것은 간단합니다.

메시지가 표시되면 허브 인증 토큰을 입력하기만 하면 됩니다.

Hub 인증 토큰을 [여기](https://huggingface.co/settings/tokens)에서 찾으세요.

In [None]:
# huggingface_hub 라이브러리에서 notebook_login 함수를 가져옵니다.
from huggingface_hub import notebook_login

# notebook_login() 함수를 호출하여 Hugging Face Hub에 로그인합니다.
# 이 함수는 로그인 토큰을 입력하는 팝업 창을 띄워줍니다.
notebook_login()

# from huggingface_hub import notebook_login을 통해 Hugging Face Hub 라이브러리에서 notebook_login 함수를 가져옵니다.
# notebook_login() 함수를 호출하면 Hugging Face Hub에 로그인할 수 있습니다.
# 이 함수는 로그인 토큰을 입력하는 팝업 창을 띄워줍니다.

# 이 코드를 실행하면 Hugging Face Hub에 로그인할 수 있습니다.
# 로그인 토큰을 직접 입력하는 방법 외에도 huggingface_hub 라이브러리를 사용하여 로그인할 수 있습니다.
# 이를 통해 Hugging Face Hub의 다양한 기능을 사용할 수 있습니다.

다음으로 Whisper 모델 체크포인트와 작업 세부정보를 정의합니다.

In [None]:
# Whisper 모델의 이름 또는 경로를 지정합니다.
model_name_or_path = "openai/whisper-large-v2"

# 수행할 작업을 "transcribe"로 설정합니다.
# 이는 오디오 파일을 텍스트로 전사하는 작업을 의미합니다.
task = "transcribe"


# model_name_or_path = "openai/whisper-large-v2"에서는 사용할 Whisper 모델의 이름 또는 경로를 지정합니다.
# 이 경우 "openai/whisper-large-v2" 모델을 사용합니다.
# task = "transcribe"에서는 수행할 작업을 "transcribe"로 설정합니다.
# 이는 오디오 파일을 텍스트로 전사하는 작업을 의미합니다.

# 이 코드는 Whisper 모델을 사용하여 오디오 파일을 텍스트로 전사하는 작업을 수행할 수 있도록 설정합니다.
#  이를 통해 음성 인식 및 자동 전사 기능을 구현할 수 있습니다.

마지막으로 Whisper를 미세 조정하려는 언어를 포함하여 데이터 세트 세부 사항을 정의합니다.

In [None]:
# Common Voice 데이터셋의 이름을 지정합니다.
dataset_name = "mozilla-foundation/common_voice_13_0"

# 사용할 언어를 한국어로 설정합니다.
language = "korean"

# 한국어의 ISO 639-1 언어 코드인 "ko"를 사용합니다.
language_abbr = "ko" # Short hand code for the language we want to fine-tune


# dataset_name = "mozilla-foundation/common_voice_13_0"에서는 사용할 Common Voice 데이터셋의 이름을 지정합니다.
# 이 경우 "mozilla-foundation/common_voice_13_0" 데이터셋을 사용합니다.
# language = "korean"에서는 사용할 언어를 한국어로 설정합니다.
# language_abbr = "ko"에서는 한국어의 ISO 639-1 언어 코드인 "ko"를 사용합니다.
# 이는 데이터셋에서 한국어 데이터를 선택하는 데 사용됩니다.

### 데이터셋 로드


🤗 데이터 세트를 사용하면 데이터를 다운로드하고 준비하는 것이 매우 간단합니다.

단 한 줄의 코드로 Common Voice 스플릿을 다운로드하고 준비할 수 있습니다.

먼저 Hugging Face Hub(mozilla-foundation/common_voice_13_0)의 사용 약관에 동의했는지 확인하세요.

약관에 동의하면 데이터 세트에 대한 전체 액세스 권한을 갖고 데이터를 로컬로 다운로드할 수 있습니다.

In [None]:
from datasets import load_dataset, DatasetDict

# Common Voice 데이터셋을 로드하기 위한 DatasetDict 객체를 생성합니다.
common_voice = DatasetDict()

# load_dataset() 함수를 사용하여 Common Voice 데이터셋의 "train+validation" 데이터를 로드합니다.
# 이때 language_abbr 변수를 사용하여 한국어 데이터를 선택합니다.
# use_auth_token=True를 설정하여 Hugging Face API 토큰을 사용합니다.
common_voice["train"] = load_dataset(dataset_name, language_abbr, split="train+validation", use_auth_token=True)

# load_dataset() 함수를 사용하여 Common Voice 데이터셋의 "test" 데이터를 로드합니다.
# 이때 language_abbr 변수를 사용하여 한국어 데이터를 선택합니다.
# use_auth_token=True를 설정하여 Hugging Face API 토큰을 사용합니다.
common_voice["test"] = load_dataset(dataset_name, language_abbr, split="test", use_auth_token=True)

# 로드된 데이터셋을 출력합니다.
print(common_voice)



# from datasets import load_dataset, DatasetDict에서는 Common Voice 데이터셋을 로드하기 위해 필요한 함수와 클래스를 가져옵니다.
# common_voice = DatasetDict()에서는 Common Voice 데이터셋을 저장할 DatasetDict 객체를 생성합니다.
# common_voice["train"] = load_dataset(dataset_name, language_abbr, split="train+validation", use_auth_token=True)에서는 load_dataset() 함수를 사용하여 Common Voice 데이터셋의 "train+validation" 데이터를 로드합니다.
# language_abbr 변수를 사용하여 한국어 데이터를 선택하며, use_auth_token=True를 설정하여 Hugging Face API 토큰을 사용합니다.
# common_voice["test"] = load_dataset(dataset_name, language_abbr, split="test", use_auth_token=True)에서는 load_dataset() 함수를 사용하여 Common Voice 데이터셋의 "test" 데이터를 로드합니다.
# 마찬가지로 language_abbr 변수를 사용하여 한국어 데이터를 선택하며, use_auth_token=True를 설정하여 Hugging Face API 토큰을 사용합니다.
# print(common_voice)에서는 로드된 데이터셋을 출력합니다.

# 이 코드를 통해 Common Voice 데이터셋의 한국어 데이터를 "train+validation"과 "test" 데이터로 분리하여 로드할 수 있습니다.
#  이후 이 데이터를 사용하여 한국어 음성 인식 모델을 fine-tuning할 수 있습니다.


* 일반적인 ASR(음성 인식) 데이터셋
    * 입력 오디오 샘플(오디오)과 해당되는 텍스트(문장)만 제공
* Common Voice
    * ASR에는 필요하지 않은 악센트와 로케일과 같은 추가 메타데이터 정보가 포함
    * 일반적인 용도로 사용하고 미세 조정을 고려하기 위해 메타데이터 정보 무시

In [None]:
common_voice = common_voice.remove_columns( #필요 없는 컬럼 제거
    ["accent", "age", "client_id", "down_votes", "gender", "locale", "path", "segment", "up_votes", "variant"]
)

print(common_voice)

#### 특성 추출기(Feature Extractor), 토크나이저(Tokenizer), 그리고 데이터준비



ASR 파이프라인은 세 단계로 분해될 수 있습니다.

1. 원시 오디오 입력을 전처리하는 특징 추출기
2. 시퀀스 간 매핑을 수행하는 모델
3. 모델 출력을 텍스트 형식으로 후처리하는 토크나이저


* Whisper
    * [WhisperFeatureExtractor](https://huggingface.co/docs/transformers/main/model_doc/whisper#transformers.WhisperFeatureExtractor)와 [WhisperTokenizer](https://huggingface.co/docs/transformers/main/model_doc/whisper#transformers.WhisperTokenizer)로 구성



In [None]:
from transformers import WhisperFeatureExtractor
# WhisperFeatureExtractor 클래스를 사용하여 Whisper 모델에 필요한 feature extractor를 로드합니다.
# from_pretrained() 메서드를 사용하여 사전 학습된 feature extractor를 가져옵니다.
# 이때 model_name_or_path 변수에는 Whisper 모델의 이름 또는 경로를 지정합니다.
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path)


# from transformers import WhisperFeatureExtractor에서는 Whisper 모델에 필요한 feature extractor를 가져옵니다.
# feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path)에서는 WhisperFeatureExtractor 클래스의 from_pretrained() 메서드를 사용하여 사전 학습된 feature extractor를 로드합니다.
# model_name_or_path 변수에는 Whisper 모델의 이름 또는 경로를 지정합니다.

# 이 코드를 통해 Whisper 모델을 사용하기 위해 필요한 feature extractor를 로드할 수 있습니다.
# 이후 이 feature extractor를 사용하여 음성 데이터를 Whisper 모델의 입력 형식으로 변환할 수 있습니다.

In [None]:
from transformers import WhisperTokenizer
# WhisperTokenizer 클래스를 사용하여 Whisper 모델에 필요한 tokenizer를 로드합니다.
# from_pretrained() 메서드를 사용하여 사전 학습된 tokenizer를 가져옵니다.
# 이때 model_name_or_path 변수에는 Whisper 모델의 이름 또는 경로를 지정합니다.
# language 변수에는 사용할 언어를 지정하고, task 변수에는 수행할 작업(예: 음성 인식, 텍스트 생성 등)을 지정합니다.
tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)


# from transformers import WhisperTokenizer에서는 Whisper 모델에 필요한 tokenizer를 가져옵니다.
# tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)에서는 WhisperTokenizer 클래스의 from_pretrained() 메서드를 사용하여 사전 학습된 tokenizer를 로드합니다.
#  model_name_or_path 변수에는 Whisper 모델의 이름 또는 경로를 지정하고, language 변수에는 사용할 언어를, task 변수에는 수행할 작업(예: 음성 인식, 텍스트 생성 등)을 지정합니다.


# 이 코드를 통해 Whisper 모델을 사용하기 위해 필요한 tokenizer를 로드할 수 있습니다.
#  이후 이 tokenizer를 사용하여 입력 데이터를 Whisper 모델의 형식으로 변환할 수 있습니다.

기능 추출기와 토크나이저 사용을 단순화하기 위해 두 가지를 하나의 `WhisperProcessor` 클래스로 _wrap_할 수 있습니다.

 이 프로세서 객체는 필요에 따라 오디오 입력 및 모델 예측에 사용될 수 있습니다.

그렇게 하면 훈련 중에 두 가지 객체만 추적하면 됩니다.

'프로세서' 및 '모델':

In [None]:
from transformers import WhisperProcessor
# WhisperProcessor 클래스를 사용하여 Whisper 모델에 필요한 processor를 로드합니다.
# from_pretrained() 메서드를 사용하여 사전 학습된 processor를 가져옵니다.
# 이때 model_name_or_path 변수에는 Whisper 모델의 이름 또는 경로를 지정합니다.
# language 변수에는 사용할 언어를 지정하고, task 변수에는 수행할 작업(예: 음성 인식, 텍스트 생성 등)을 지정합니다.
processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)


# from transformers import WhisperProcessor에서는 Whisper 모델에 필요한 processor를 가져옵니다.
# processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)에서는 WhisperProcessor 클래스의 from_pretrained() 메서드를 사용하여 사전 학습된 processor를 로드합니다.
# model_name_or_path 변수에는 Whisper 모델의 이름 또는 경로를 지정하고, language 변수에는 사용할 언어를, task 변수에는 수행할 작업(예: 음성 인식, 텍스트 생성 등)을 지정합니다.


# 이 코드를 통해 Whisper 모델을 사용하기 위해 필요한 processor를 로드할 수 있습니다.
# 이후 이 processor를 사용하여 입력 데이터를 Whisper 모델의 형식으로 변환할 수 있습니다.

##### 데이터 준비


Common Voice 데이터셋의 첫 번째 예제를 출력하여 데이터의 형식을 살펴봄

In [None]:
print(common_voice["train"][0])

* Whisper 모델 샘플링
    * 입력 오디오는 48 kHz 새플링
    * Whisper feature extractor에 전달하기 위해서 16 kHz로 다운샘플 진행
* 샘플링 속도 설정
    * Dataset의 [`cast_column`](https://huggingface.co/docs/datasets/package_reference/main_classes.html?highlight=cast_column#datasets.DatasetDict.cast_column) 방법 사용: 오디오 입력을 올바른 샘플링 속도로 설정
    * 오디오를 변경하는 것이 아니라 오디오 샘플을 실시간으로 받을 수 있도록 함


In [None]:
from datasets import Audio

# common_voice 데이터셋의 "audio" 열을 Audio 객체로 변환합니다.
# Audio 객체는 오디오 데이터를 처리하는 데 필요한 정보를 포함하고 있습니다.
# sampling_rate=16000 옵션을 통해 오디오 데이터의 샘플링 레이트를 16kHz로 설정합니다.
# 이는 일반적인 음성 처리 작업에 적합한 샘플링 레이트입니다.
common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))


# from datasets import Audio에서는 오디오 데이터를 처리하는 데 필요한 Audio 클래스를 가져옵니다.
# common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))에서는 common_voice 데이터셋의 "audio" 열을 Audio 객체로 변환합니다. sampling_rate=16000 옵션을 통해 오디오 데이터의 샘플링 레이트를 16kHz로 설정합니다.
# 이는 일반적인 음성 처리 작업에 적합한 샘플링 레이트입니다.


# 이 코드를 통해 common_voice 데이터셋의 오디오 데이터를 처리할 수 있습니다.
# 이후 이 데이터를 사용하여 음성 인식, 음성 합성 등의 작업을 수행할 수 있습니다.
# 또한 사용자 정의 오디오 데이터셋을 생성하고 공유할 수 있는 기능도 제공합니다.
# 이를 통해 음성 처리 모델 개발에 필요한 데이터를 쉽게 확보할 수 있습니다.

Common Voice 데이터세트의 첫 번째 오디오 샘플을 다시 로드하면 리샘플링됩니다.
원하는 샘플링 속도로 설정합니다.

In [None]:
print(common_voice["train"][0])

이제 모델에 사용할 데이터를 준비하는 함수를 작성할 수 있습니다.
1. `batch["audio"]`를 호출하여 오디오 데이터를 로드하고 리샘플링합니다. 위에서 설명한 대로 🤗 데이터세트는 필요한 모든 리샘플링 작업을 즉시 수행합니다.
2. 특징 추출기를 사용하여 1차원 오디오 배열에서 log-Mel 스펙트로그램 입력 특징을 계산합니다.
3. 토크나이저를 사용하여 텍스트를 라벨 ID로 인코딩합니다.

In [None]:
def prepare_dataset(batch):
    # 1. 오디오 데이터 로드 및 16kHz로 리샘플링
    audio = batch["audio"]

    # 2. 입력 오디오 배열에서 로그-멜 스펙트로그램 특징 추출
    # feature_extractor는 오디오 특징 추출기 모델
    # input_features는 추출된 특징 벡터
    batch["input_features"] = processor.feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # 3. 타겟 문장을 토크나이저를 사용하여 라벨 ID로 인코딩
    # tokenizer는 텍스트를 토큰화하고 ID로 변환하는 모델
    batch["labels"] = processor.tokenizer(batch["sentence"]).input_ids

    # 4. 준비된 데이터를 반환
    return batch


# audio = batch["audio"]에서는 입력 데이터셋의 "audio" 필드에서 오디오 데이터를 가져옵니다.
# batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]에서는 입력 오디오 배열에서 로그-멜 스펙트로그램 특징을 추출합니다. feature_extractor는 오디오 특징 추출기 모델이며, 추출된 특징 벡터는 input_features에 저장됩니다.
# batch["labels"] = tokenizer(batch["sentence"]).input_ids에서는 타겟 문장을 토크나이저를 사용하여 라벨 ID로 인코딩합니다.
# tokenizer는 텍스트를 토큰화하고 ID로 변환하는 모델입니다.
# 준비된 데이터는 return batch를 통해 반환됩니다.

# 이 코드는 오디오 데이터와 텍스트 데이터를 처리하여 모델 학습에 사용할 수 있는 형태로 변환합니다.
# 오디오 데이터는 로그-멜 스펙트로그램 특징으로 변환되며, 텍스트 데이터는 토크나이저를 통해 라벨 ID로 인코딩됩니다.

데이터 세트의 `.map` 메소드를 사용하여 모든 훈련 예제에 데이터 준비 기능을 적용할 수 있습니다.

'num_proc' 인수는 사용할 CPU 코어 수를 지정합니다.

`num_proc` > 1로 설정하면 다중 처리가 활성화됩니다.

`.map` 메서드가 다중 처리로 인해 중단되면 `num_proc=1`을 설정하고 데이터세트를 순차적으로 처리합니다.

직접 만들어 보세요 🍵, 데이터세트 크기에 따라 20~30분 정도 걸릴 수 있습니다 ⏰

In [None]:
# Common Voice 데이터셋을 처리하는 코드
common_voice = common_voice.map(
    # prepare_dataset 함수를 적용하여 데이터셋을 준비
    prepare_dataset,
    # 기존 "train" 데이터셋의 열을 제거
    remove_columns=common_voice.column_names["train"],
    # 2개의 프로세스를 사용하여 병렬 처리
    num_proc=2,
)


# common_voice = common_voice.map(...): Common Voice 데이터셋을 처리하는 코드입니다.
# prepare_dataset: 이전에 정의한 데이터셋 준비 함수를 적용합니다.
# remove_columns=common_voice.column_names["train"]: "train" 데이터셋의 열을 제거합니다. 이는 중복된 정보를 제거하기 위함입니다.
# num_proc=2: 2개의 프로세스를 사용하여 병렬 처리를 수행합니다. 이를 통해 처리 속도를 높일 수 있습니다.

# 이 코드는 Common Voice 데이터셋을 처리하여 모델 학습에 사용할 수 있는 형태로 변환합니다.
#  데이터셋의 오디오 데이터는 로그-멜 스펙트로그램 특징으로 변환되며, 텍스트 데이터는 토크나이저를 통해 라벨 ID로 인코딩됩니다.
#  또한 병렬 처리를 통해 처리 속도를 높이고 있습니다.

In [None]:
common_voice["train"]

#### 훈련 및 검증


훈련 파이프라인

* [🤗 Trainer](https://huggingface.co/transformers/master/main_classes/trainer.html?highlight=trainer)가 대부분의 작업을 처리:


1. 데이터 collator 정의: 데이터 콜레이터는 우리가 전처리한 데이터를 가져와 모델에 사용할 수 있는 PyTorch 텐서로 준비
2. 평가 지표: 평가 중에는 모델을 글자 오류율  [word error rate (CER)](https://huggingface.co/metrics/cer)지표를 사용하여 평가
3. 사전 훈련된 체크포인트 load: 사전 훈련된 체크포인트를 로드하고 훈련을 위해 올바르게 구성
4. 훈련 구성 정의: 🤗 Trainer가 훈련 스케줄을 정의에 사용

* 미세 조정한 후에는 테스트 데이터에서 모델을 평가하여 한국어 음성을 올바르게 전사

##### Data Collator 정의


* 시퀀스-투-시퀀스 음성 모델의 데이터 콜레이터
    * Input_features와 labels를 독립적으로 처리
    
    - Input_features: feature extractor에 의해 처리
    - labels: tokenizer에 의해 처리

* Input_features는 이미 30초로 패딩되어 있고 특성 추출기에 의해 고정된 차원의 로그 멜 스펙트로그램으로 변환. 따라서 우리가 해야 할 일은 input_features를 배치 처리된 PyTorch 텐서로 변환

* labels는 패딩되지 않음 먼저 배치 내에서 최대 길이에 맞게 시퀀스를 패딩하고, tokenizer의 .pad 메서드를 사용하여 시퀀스를 패딩 패딩 토큰은 손실을 계산할 때 고려되지 않도록 -100으로 대체. 그런 다음 레이블 시퀀스의 시작에서 BOS 토큰을 잘라서 훈련 중에 나중에 이를 추가

* 이전에 정의한 WhisperProcessor를 활용하여 특성 추출기 및 토크나이저 작업을 모두 수행 가능

In [None]:
import torch
from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # 1. 입력 특징과 라벨 데이터를 분리
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        # 2. 입력 특징 배치 생성
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # 3. 라벨 데이터 배치 생성 및 패딩
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # 4. 패딩된 라벨에서 -100으로 마스킹하여 손실 계산 시 패딩 부분 무시
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # 5. 이전 토크나이징 과정에서 BOS 토큰이 추가된 경우 제거
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        # 6. 처리된 배치 데이터 반환
        batch["labels"] = labels
        return batch


# input_features = [{"input_features": feature["input_features"]} for feature in features]: 입력 특징과 라벨 데이터를 분리합니다.
# batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt"): 입력 특징 배치를 생성합니다. self.processor.feature_extractor.pad()는 입력 특징을 패딩하여 배치 형태로 변환합니다.
# labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt"): 라벨 데이터 배치를 생성합니다. self.processor.tokenizer.pad()는 라벨 데이터를 패딩하여 배치 형태로 변환합니다.
# labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100): 패딩된 라벨 데이터에서 -100으로 마스킹하여 손실 계산 시 패딩 부분을 무시합니다.
# if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item(): labels = labels[:, 1:]: 이전 토크나이징 과정에서 BOS(Begin Of Sequence) 토큰이 추가된 경우 제거합니다.
# batch["labels"] = labels; return batch: 처리된 배치 데이터를 반환합니다.


# 이 코드는 음성 인식 모델 학습을 위한 데이터 전처리 과정을 담당합니다.
# 입력 특징과 라벨 데이터를 분리하고, 각각 패딩하여 배치 형태로 변환합니다.
# 또한 라벨 데이터에서 패딩 부분을 -100으로 마스킹하여 손실 계산 시 이를 무시하도록 합니다.
# 마지막으로 BOS 토큰이 추가된 경우 제거하여 최종 배치 데이터를 반환합니다.

Data collator 초기화 진행

In [None]:
# 1. DataCollatorSpeechSeq2SeqWithPadding 클래스 인스턴스 생성
# processor 매개변수에는 음성 데이터 전처리를 위한 Processor 객체가 전달됩니다.
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)


# data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor): DataCollatorSpeechSeq2SeqWithPadding 클래스의 인스턴스를 생성합니다.
#  이 클래스는 음성 데이터 전처리를 담당하며, processor 매개변수에는 음성 데이터 전처리를 위한 Processor 객체가 전달됩니다.


# 이 코드는 음성 인식 모델 학습을 위한 데이터 전처리 과정을 수행하는 부분입니다.
#  DataCollatorSpeechSeq2SeqWithPadding 클래스는 입력 특징과 라벨 데이터를 분리하고, 각각 패딩하여 배치 형태로 변환합니다.
#   또한 라벨 데이터에서 패딩 부분을 -100으로 마스킹하여 손실 계산 시 이를 무시하도록 합니다.
#  마지막으로 BOS 토큰이 추가된 경우 제거하여 최종 배치 데이터를 반환합니다.

##### 평가 지표


* ASR 시스템을 평가하기 위한 '사실상의' 지표인 한 단어 오류율(CER) 메트릭을 사용
* 더 많은 정보는 [문서](https://huggingface.co/metrics/cer)를 참조. 우리는 🤗 Evaluate에서 CER 메트릭을 로드

In [None]:
# 1. evaluate 라이브러리 import
import evaluate

# 2. "cer" 메트릭 로드
metric = evaluate.load("cer")


# import evaluate: Hugging Face의 evaluate 라이브러리를 import합니다.
# 이 라이브러리는 다양한 자연어 처리 및 음성 처리 평가 메트릭을 제공합니다.
# metric = evaluate.load("cer"): evaluate.load("cer") 함수를 사용하여 Character Error Rate (CER) 메트릭을 로드합니다.
# CER은 자동 음성 인식 시스템을 평가하는 데 사용되는 일반적인 메트릭으로, Word Error Rate (WER)과 유사합니다.

# 추가 정보
# CER (Character Error Rate)
# CER은 자동 음성 인식 시스템의 성능을 평가하는 데 사용되는 메트릭입니다.
# CER은 입력 문장과 인식 결과 간의 문자 단위 편집 거리(insertion, deletion, substitution)를 계산하여 오류율을 측정합니다.
# CER은 WER과 유사하지만, 단어 단위가 아닌 문자 단위로 오류를 계산합니다.
# CER은 특히 한국어와 같이 띄어쓰기가 중요한 언어에서 유용한 평가 지표가 될 수 있습니다.

##### 사전 학습 모델 로드

* 사전 훈련된 Whisper 체크포인트를 로드
    * 이 작업은 🤗 Transformers를 사용하여 매우 간단



* 모델의 메모리 사용량을 줄이기 위해 모델을 8비트로         
    * 모델을 1/4 정밀도(32비트와 비교했을 때)로 양자화하여 성능 손실을 최소화 [here](https://huggingface.co/blog/hf-bitsandbytes-integration)

In [None]:
pip install --upgrade bitsandbytes

In [None]:
from transformers import WhisperForConditionalGeneration
# WhisperForConditionalGeneration 모델을 transformers 라이브러리에서 import합니다.
# 이 모델은 조건부 생성(conditional generation) 작업을 위한 모델입니다.

# Quantization(양자화)을 수행하여 weight들을 float32에서 int로 변환하여 성능을 최적화합니다.
# 이를 통해 모델의 성능을 최적화할 수 있습니다.

# WhisperForConditionalGeneration 모델을 불러오고, 8비트로 로드하며, 디바이스 매핑을 설정합니다.
model = WhisperForConditionalGeneration.from_pretrained(model_name_or_path, load_in_8bit=True, device_map={"":0})
# model_name_or_path: 불러올 모델의 이름 또는 경로를 지정합니다.
# load_in_8bit=True: 모델을 8비트로 로드하도록 설정합니다. 이는 양자화된 모델을 의미합니다.
# device_map={"":0}: 모델을 특정 디바이스에 매핑하는 설정을 지정합니다. 여기서는 빈 문자열("")을 0번 디바이스에 매핑하는 것으로 보입니다.


##### 모델 후처리


1. 훈련을 가능하게 하기 위해 8비트 모델에 몇 가지 후처리 단계를 적용
2. 모델 레이어를 동결, 훈련과 모델의 안정성을 위해 레이어 정규화와 출력 레이어를 float32로 캐스팅

(모델 안정성과 layer normalization 분석, float32로 캐스팅 하는 이유)

In [None]:
from peft import prepare_model_for_kbit_training
# from peft import prepare_model_for_kbit_training 명령어로 peft 라이브러리에서 prepare_model_for_kbit_training 함수를 불러옵니다.
# 이 함수는 모델을 8비트 양자화(quantization)하여 성능을 최적화하는 데 사용됩니다.

# 일반적으로 모델의 weight들은 float32 형식으로 저장되어 있습니다.
# 이를 int 형식으로 변환하는 양자화 작업을 통해 모델의 성능을 최적화할 수 있습니다.
# 양자화된 모델은 메모리 사용량이 줄어들고 추론 속도가 향상됩니다.


model = prepare_model_for_kbit_training(model)
# prepare_model_for_kbit_training 함수는 peft 라이브러리에서 제공하는 함수입니다.
# 이 함수는 입력으로 받은 모델 객체를 양자화된 상태로 변환하여 반환합니다.
# 양자화된 모델 객체는 model 변수에 할당됩니다.

* Whisper 모델은 인코더에 컨볼루션 레이어를 사용하기 때문에 체크포인팅은 grad 연산을 비활성. 이를 피하기 위해 입력을 특별히 trainable하게 만들어야 합니다.


In [None]:
# make_inputs_require_grad 함수 정의
def make_inputs_require_grad(module, input, output):
    # output 텐서의 requires_grad 속성을 True로 설정
    # 이를 통해 출력 텐서에 대한 변화도(gradient)를 계산할 수 있게 됩니다.
    output.requires_grad_(True)


# register_forward_hook 메서드를 사용하여 make_inputs_require_grad 함수를 conv1 레이어에 등록
# 이렇게 하면 conv1 레이어의 forward 연산이 실행될 때마다 make_inputs_require_grad 함수가 호출됩니다.
model.model.encoder.conv1.register_forward_hook(make_inputs_require_grad)


# make_inputs_require_grad 함수는 모듈의 입력과 출력을 받아서 출력 텐서의 requires_grad 속성을 True로 설정합니다.
# 이를 통해 출력 텐서에 대한 변화도(gradient)를 계산할 수 있게 됩니다.
# register_forward_hook 메서드를 사용하여 make_inputs_require_grad 함수를 conv1 레이어에 등록합니다.
# 이렇게 하면 conv1 레이어의 forward 연산이 실행될 때마다 make_inputs_require_grad 함수가 호출됩니다.

##### Low-rank adapters (LoRA)를 모델에 적용



* `PeftModel`을 로드하고 `peft`의 `get_peft_model` 유틸리티 함수를 사용하여 낮은 순위 어댑터(LoRA)를 사용할 것이라고 지정해 보겠습니다.


In [None]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model

# LoraConfig 객체를 생성하여 Lora 모델의 구성을 설정합니다.
# r=32: Lora 모델의 랭크 값을 32로 설정합니다. 이는 모델의 복잡도를 결정합니다.
# lora_alpha=64: Lora 모델의 스케일링 팩터를 64로 설정합니다.
# target_modules=["q_proj", "v_proj"]: Lora 모듈이 적용될 모델의 특정 레이어를 지정합니다.
# lora_dropout=0.05: Lora 모델의 드롭아웃 비율을 0.05로 설정합니다.
# bias="none": Lora 모델의 편향을 "none"으로 설정합니다.
config = LoraConfig(r=32, lora_alpha=64, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none")

# get_peft_model 함수를 사용하여 PEFT 모델을 가져옵니다.
# 이 함수는 기존 모델에 Lora 모듈을 적용하여 PEFT 모델을 생성합니다.
model = get_peft_model(model, config)

# model.print_trainable_parameters()를 호출하여 훈련 가능한 매개변수를 출력합니다.
# 이를 통해 모델의 구조와 매개변수 분포를 확인할 수 있습니다.
model.print_trainable_parameters()


# 이 코드는 PEFT(Probabilistic Early-stopping Framework)를 사용하여 Lora 모델을 구성하고 훈련 가능한 매개변수를 출력하는 것입니다.
# PEFT는 모델 훈련 중에 조기 종료를 결정하기 위한 확률적인 프레임워크입니다. Lora 모델은 모델의 성능을 향상시키기 위해 사용되는 기술 중 하나로, 모델의 일부 레이어에만 추가 매개변수를 도입하여 모델의 복잡도를 낮추고 훈련 속도를 높일 수 있습니다.
# 이 코드는 PEFT를 사용하여 Lora 모델을 설정하고 훈련 가능한 매개변수를 출력하는 것입니다. 이를 통해 모델의 성능을 향상시키고 효율적인 훈련을 수행할 수 있습니다.
# 추가로, 이 코드는 모델 구성과 매개변수 출력 외에도 모델 저장, 로드, 평가 등의 기능을 포함할 수 있습니다. 이를 통해 모델 개발 및 배포 과정을 더욱 효율적으로 수행할 수 있습니다.

**1%**의 학습 parameter를 사용하였고 **Parameter-Efficient Fine-Tuning**를 적용


##### 훈련 구성 정의


마지막 단계에서는 훈련과 관련된 모든 매개변수를 정의 훈련 인자에 대한 자세한 내용은 해당 문서를 참조 Seq2SeqTrainingArguments [docs](https://huggingface.co/docs/transformers/main_classes/trainer#transformers.Seq2SeqTrainingArguments)


In [None]:
from transformers import Seq2SeqTrainingArguments

# Seq2SeqTrainingArguments 클래스는 sequence-to-sequence 모델 학습을 위한 다양한 하이퍼파라미터를 정의할 수 있게 해줍니다.
# 이 클래스는 transformers 라이브러리에서 제공됩니다.

# training_args 변수에 Seq2SeqTrainingArguments 객체를 생성합니다.
# 각 인자는 다음과 같은 의미를 가집니다:
training_args = Seq2SeqTrainingArguments(
    # output_dir: 모델 체크포인트와 기타 출력 파일이 저장될 디렉토리 경로
    output_dir="fastwhisper",  # change to a repo name of your choice

    # per_device_train_batch_size: 각 GPU/CPU 장치당 훈련 배치 크기
    per_device_train_batch_size=8,

    # gradient_accumulation_steps: 경사도 누적 단계 수
    # 이 값을 2배씩 늘리면 배치 크기를 1/2로 줄일 수 있습니다.
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size

    # learning_rate: 학습률
    learning_rate=1e-3,

    # warmup_steps: 학습률 warmup 단계 수
    warmup_steps=50,

    # num_train_epochs: 훈련 에폭 수
    num_train_epochs=1,

    # evaluation_strategy: 평가 전략 (예: "steps", "epoch")
    evaluation_strategy="steps",

    # fp16: 혼합 정밀도 사용 여부
    fp16=True,

    # per_device_eval_batch_size: 각 GPU/CPU 장치당 평가 배치 크기
    per_device_eval_batch_size=8,

    # generation_max_length: 생성 출력의 최대 길이
    generation_max_length=128,

    # logging_steps: 로깅 단계 수
    logging_steps=100,

    # max_steps: 최대 훈련 단계 수 (테스트 목적으로만 사용)
    max_steps=100, # only for testing purposes, remove this from your final run :)

    # remove_unused_columns: 사용되지 않는 열 제거 여부
    remove_unused_columns=False,  # required as the PeftModel forward doesn't have the signature of the wrapped model's forward

    # label_names: 레이블 이름 목록
    label_names=["labels"],  # same reason as above
)


# 이 코드는 Seq2SeqTrainingArguments 클래스를 사용하여 sequence-to-sequence 모델 학습을 위한 다양한 하이퍼파라미터를 정의하고 있습니다.
# 이 클래스는 transformers 라이브러리에서 제공되며, 모델 학습 과정에서 필요한 여러 가지 설정을 할 수 있게 해줍니다.

# 각 인자의 의미와 역할에 대해 자세히 설명했습니다.
# 이를 통해 모델 학습 과정을 효과적으로 구성할 수 있습니다.

PEFT를 사용하여 모델을 미세 조정하려면 몇 가지 주의 사항이 있습니다.

1. PeftModel의 전달은 기본 모델 전달의 서명을 상속하지 않으므로 `remove_unused_columns=False` 및 `label_names=["labels"]`를 명시적으로 설정해야 합니다.

2. INT8 훈련에는 자동 캐스팅이 필요하므로 Trainer에서 기본 `predict_with_generate` 호출은 자동으로 캐스팅되지 않으므로 사용할 수 없습니다.

3. 마찬가지로 자동 캐스팅을 할 수 없기 때문에 `compute_metrics`를 `Seq2SeqTrainer`에 전달할 수 없으므로 Trainer를 인스턴스화하는 동안 주석 처리하겠습니다.

In [None]:
from transformers import Seq2SeqTrainer, TrainerCallback, TrainingArguments, TrainerState, TrainerControl
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR

# 이 콜백은 어댑터 가중치만 저장하고 기본 모델 가중치는 제거합니다.
class SavePeftModelCallback(TrainerCallback):
    def on_save(
        self,
        args: TrainingArguments,
        state: TrainerState,
        control: TrainerControl,
        **kwargs,
    ):
        # 체크포인트 폴더 경로를 정의합니다.
        # 체크포인트 폴더는 'checkpoint-{global_step}'로 이름이 지정됩니다.
        checkpoint_folder = os.path.join(args.output_dir, f"{PREFIX_CHECKPOINT_DIR}-{state.global_step}")

        # 어댑터 모델 가중치만 저장합니다.
        # 이 작업은 PEFT 모델의 어댑터 가중치를 'adapter_model' 하위폴더에 저장합니다.
        peft_model_path = os.path.join(checkpoint_folder, "adapter_model")
        kwargs["model"].save_pretrained(peft_model_path)

        # 기본 모델 가중치 (pytorch_model.bin)를 제거하여 공간을 절약합니다.
        # 어댑터 가중치만 저장하려고 하므로 이 파일은 필요하지 않습니다.
        pytorch_model_path = os.path.join(checkpoint_folder, "pytorch_model.bin")
        if os.path.exists(pytorch_model_path):
            os.remove(pytorch_model_path)
        return control


# 정의된 설정으로 Seq2SeqTrainer 인스턴스를 생성합니다.
# 이 Trainer는 Seq2Seq 모델을 학습하는 데 사용됩니다.
trainer = Seq2SeqTrainer(
    args=training_args,  # 하이퍼파라미터를 포함한 학습 인자
    model=model,  # 학습할 Seq2Seq 모델
    train_dataset=common_voice["train"],  # 학습 데이터셋
    eval_dataset=common_voice["test"],  # 평가 데이터셋
    data_collator=data_collator,  # 학습 데이터 처리를 위한 데이터 콜레이터
    tokenizer=processor.feature_extractor,  # 모델용 토크나이저
    callbacks=[SavePeftModelCallback],  # PEFT 모델 가중치를 저장하기 위한 사용자 정의 콜백
)

# 모델 캐싱을 비활성화하여 경고를 억제합니다 (추론을 위해 활성화됨)
# 이 설정은 학습에 권장되는 것이며 추론을 위해 활성화해야 합니다.
model.config.use_cache = False



# SavePeftModelCallback 클래스에 대한 설명: 이 콜백은 PEFT 모델의 어댑터 가중치만 저장하고 기본 모델 가중치는 제거합니다.
# on_save 메서드에 대한 주석: 체크포인트 폴더 경로 정의, PEFT 모델의 어댑터 가중치 저장, 기본 모델 가중치 제거 등의 작업을 수행합니다.
# Seq2SeqTrainer 객체 생성에 대한 주석: 모델, 데이터셋, 데이터 전처리기, 토크나이저 등의 인자를 전달하여 Seq2Seq 모델 학습을 위한 Trainer 객체를 생성합니다.
# model.config.use_cache = False 설정에 대한 주석: 추론 시 사용하기 위해 캐싱 기능을 비활성화합니다.

In [None]:
trainer.train()

Fine-tuning한 모델을 Hugging Face Hub에 저장합니다. 나중에 모델을 불러올 때 편함

In [None]:
peft_model_id = "whisper-large-v2-korea-common_13"
model.push_to_hub(peft_model_id)

### 평가 및 검증

Finetuning을 성공적으로 했으면 이제 테스트 데이터셋에서 저희 모델을 테스트하고 CER(Character Error Rate) 계산해보겠습니다.

테스트 유의할 점들:

1. predict_with_generate 함수를 사용할 수 없으므로 자체적으로 torch.cuda.amp.autocast()를 사용하여 eval 루프를 직접 구현합니다.

2. 기본 모델이 고정되어 있기 때문에 PEFT 모델은 때로 디코딩 중에 언어를 인식하지 못할 수 있습니다. 이를 해결하기 위해 디코딩 시작 토큰에 번역 중인 언어를 명시적으로 지정합니다. 이 작업은 forced_decoder_ids = processor.get_decoder_prompt_ids(language="Marathi", task="transcribe")를 사용하여 수행하고 model.generate 호출에 이를 전달

In [None]:
from peft import PeftModel, PeftConfig

# PEFT(Parameter-Efficient Fine-Tuning) 모델 및 구성 정보를 가져옵니다.
peft_model_id = "youngisk/whisper-large-v2-korea-common_13"  # fine-tuned Whisper 모델의 ID
peft_config = PeftConfig.from_pretrained(peft_model_id)  # fine-tuned Whisper 모델의 구성 정보를 가져옵니다.

from transformers import WhisperForConditionalGeneration, Seq2SeqTrainer

# Whisper 모델을 로드하고 PEFT 기술을 적용합니다.
model = WhisperForConditionalGeneration.from_pretrained(
    peft_config.base_model_name_or_path, load_in_8bit=True, device_map="auto"
)
# `WhisperForConditionalGeneration` 클래스를 사용하여 Whisper 모델을 로드합니다.
# `load_in_8bit=True`는 모델을 8비트 정밀도로 로드하여 메모리 사용량을 줄입니다.
# `device_map="auto"`는 모델을 자동으로 여러 GPU에 분산하여 로드합니다.

# PEFT 기술을 적용한 모델을 가져옵니다.
model = PeftModel.from_pretrained(model, peft_model_id)
# `PeftModel` 클래스를 사용하여 fine-tuning된 Whisper 모델을 로드합니다.
# 이를 통해 PEFT 기술을 적용할 수 있습니다.

# 모델의 캐싱 기능을 활성화하여 추론 속도를 높입니다.
model.config.use_cache = True



# 이 코드는 PEFT 기술을 사용하여 fine-tuning된 Whisper 모델을 로드하고 구성하는 과정을 보여줍니다.
# 모델을 8비트 정밀도로 로드하고 여러 GPU에 분산하여 메모리 사용량을 줄이며, PEFT 기술을 적용하여 모델의 성능을 향상시킵니다.
# 마지막으로 모델의 캐싱 기능을 활성화하여 추론 속도를 높입니다.

In [None]:
import gc  # 가비지 컬렉션 모듈을 가져와 메모리 관리에 사용합니다.
import numpy as np  # 배열 및 행렬 연산을 위한 NumPy 모듈을 가져옵니다.
from tqdm import tqdm  # 진행 표시를 위한 tqdm 모듈을 가져옵니다.
from torch.utils.data import DataLoader  # 데이터 로딩을 위한 PyTorch DataLoader 모듈을 가져옵니다.
from transformers.models.whisper.english_normalizer import BasicTextNormalizer  # 텍스트 정규화를 위한 모듈을 가져옵니다.

# 평가 데이터 로더를 생성합니다.
eval_dataloader = DataLoader(common_voice["test"], batch_size=8, collate_fn=data_collator)
# 강제 디코더 ID를 가져옵니다.
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)
# 텍스트 정규화 객체를 생성합니다.
normalizer = BasicTextNormalizer()

# 예측값, 참조값, 정규화된 예측값, 정규화된 참조값을 저장할 리스트를 초기화합니다.
predictions = []
references = []
normalized_predictions = []
normalized_references = []

# 모델을 평가 모드로 설정합니다.
model.eval()
# 평가 데이터로더를 반복하면서 추론을 수행합니다.
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.cuda.amp.autocast():  # 자동 혼합 정밀도(AMP)를 사용하여 GPU 메모리를 효율적으로 활용합니다.
        with torch.no_grad():  # 그라디언트 계산을 비활성화하여 메모리를 절약합니다.
            # 모델을 사용하여 토큰을 생성합니다.
            generated_tokens = (
                model.generate(
                    input_features=batch["input_features"].to("cuda"),  # 입력 데이터를 GPU로 이동합니다.
                    forced_decoder_ids=forced_decoder_ids,  # 강제 디코더 ID를 적용합니다.
                    max_new_tokens=255,  # 최대 생성 토큰 수를 지정합니다.
                )
                .cpu()  # 생성된 토큰을 CPU로 이동합니다.
                .numpy()  # 넘파이 배열로 변환합니다.
            )
            labels = batch["labels"].cpu().numpy()  # 레이블을 CPU로 이동하고 넘파이 배열로 변환합니다.
            labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)  # 레이블을 처리합니다.
            decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)  # 예측값을 디코딩합니다.
            decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)  # 레이블을 디코딩합니다.
            predictions.extend(decoded_preds)  # 예측값을 리스트에 추가합니다.
            references.extend(decoded_labels)  # 레이블을 리스트에 추가합니다.
            normalized_predictions.extend([normalizer(pred).strip() for pred in decoded_preds])  # 정규화된 예측값을 리스트에 추가합니다.
            normalized_references.extend([normalizer(label).strip() for label in decoded_labels])  # 정규화된 레이블을 리스트에 추가합니다.
        del generated_tokens, labels, batch  # 불필요한 변수를 삭제하여 메모리를 해제합니다.
    gc.collect()  # 메모리 해제를 위해 가비지 컬렉션을 수행합니다.

# Word Error Rate(cer)를 계산합니다.
cer = 100 * metric.compute(predictions=predictions, references=references)
# 정규화된 CER를 계산합니다.
normalized_cer = 100 * metric.compute(predictions=normalized_predictions, references=normalized_references)
# 평가 메트릭을 저장합니다.
eval_metrics = {"eval/cer": cer, "eval/normalized_cer": normalized_cer}

# CER 및 정규화된 CER를 출력합니다.
print(f"{cer=} and {normalized_cer=}")
# 평가 메트릭을 출력합니다.
print(eval_metrics)




# 코드 설명
# 평가 데이터 로더를 생성하고, 강제 디코더 ID와 텍스트 정규화 객체를 준비합니다.

# 예측값, 참조값, 정규화된 예측값, 정규화된 참조값을 저장할 리스트를 초기화합니다.

# 모델을 평가 모드로 설정하고, 평가 데이터로더를 반복하면서 추론을 수행합니다.
# 자동 혼합 정밀도(AMP)를 사용하여 GPU 메모리를 효율적으로 활용합니다.
# 그라디언트 계산을 비활성화하여 메모리를 절약합니다.
# 모델을 사용하여 토큰을 생성하고, 예측값과 참조값을 디코딩하여 리스트에 저장합니다.
# 정규화된 예측값과 참조값도 리스트에 저장합니다.
# 불필요한 변수를 삭제하고 가비지 컬렉션을 수행하여 메모리를 해제합니다.

# Word Error Rate(WER)와 정규화된 WER를 계산하고, 평가 메트릭을 저장합니다.

# WER, 정규화된 WER, 평가 메트릭을 출력합니다.

# 이 코드는 Whisper 모델의 성능을 평가하고 메트릭을 계산하는 과정을 보여줍니다.
#  메모리 관리, 자동 혼합 정밀도 사용, 그라디언트 계산 비활성화 등의 기법을 통해 효율적인 추론 수행을 보여줍니다.

#### 마무리!



Whisper 체크포인트를 더 빠르고 저렴하게 훈련하고 CER에서 거의 손실이 없도록 학습하는 방법을 배움

PEFT (Pretraining Efficiently with Fine-Tuning)를 사용하면 음성 인식 이외에도 다른 사전 훈련된 모델에 동일한 기술 세트를 적용 가능. 아래 링크에서 자세히 설명: https://github.com/huggingface/peft 🤗

# Whisper 모델 별 테스트

In [None]:
!pip install --upgrade pip
!pip install --upgrade git+https://github.com/huggingface/transformers.git accelerate datasets[audio]
!pip install datasets
!pip install evaluate
!pip install jiwer

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

## 모델 선택

In [None]:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from datasets import load_dataset, DatasetDict
import evaluate
from transformers import WhisperForConditionalGeneration
from transformers import WhisperProcessor
from transformers import Seq2SeqTrainer

# 일반 모델 로딩
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3"
# model_id = "openai/whisper-base"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)



# 노인발화데이터 3GB로 파인튜닝한 모델
# "openai/whisper-base" fine_tuned moel
# model = WhisperForConditionalGeneration.from_pretrained("/content/model")
# model.to("cuda")
# processor = WhisperProcessor.from_pretrained("/content/model")

## 허깅 페이스 로그인

- common_voice 14부터는 로그인 및 권한 확인 필요

In [None]:
# 토큰 : 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
from huggingface_hub import notebook_login

notebook_login()

## common_voice 로드 및 전처리

In [None]:
common_voice = DatasetDict()
test_version = "mozilla-foundation/common_voice_15_0"
common_voice["test"] = load_dataset(test_version, "ko", split="test", use_auth_token=True)

language_abbr = "ko"
task = "transcribe"

import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        label_features = [{"input_ids": feature["labels"]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
from datasets import Audio

common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))

feature_extractor=processor.feature_extractor
tokenizer=processor.tokenizer

def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["sentence"]).input_ids
    return batch

common_voice = common_voice.map(prepare_dataset, remove_columns=common_voice.column_names["test"], num_proc=1)

## 평가진행

- test : common_voice15

In [None]:
import gc
import numpy as np
from tqdm import tqdm
from torch.utils.data import DataLoader
from transformers.models.whisper.english_normalizer import BasicTextNormalizer

metric = evaluate.load("cer")

eval_dataloader = DataLoader(common_voice["test"], batch_size=8, collate_fn=data_collator)
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language_abbr, task=task)
normalizer = BasicTextNormalizer()

predictions = []
references = []
normalized_predictions = []
normalized_references = []

model.eval()
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.cuda.amp.autocast():
        with torch.no_grad():
            generated_tokens = (
                model.generate(
                    input_features=batch["input_features"].to("cuda"),
                    forced_decoder_ids=forced_decoder_ids,
                    max_new_tokens=255,
                )
                .cpu()
                .numpy()
            )
            labels = batch["labels"].cpu().numpy()
            labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)
            decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
            decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)
            predictions.extend(decoded_preds)
            references.extend(decoded_labels)
            normalized_predictions.extend([normalizer(pred).strip() for pred in decoded_preds])
            normalized_references.extend([normalizer(label).strip() for label in decoded_labels])
        del generated_tokens, labels, batch
    gc.collect()
cer = 100 * metric.compute(predictions=predictions, references=references)
normalized_cer = 100 * metric.compute(predictions=normalized_predictions, references=normalized_references)
eval_metrics = {"eval/cer": cer, "eval/normalized_cer": normalized_cer}

print(f"{cer=}  and {normalized_cer=}  ")
print(eval_metrics)

## 모델 이용해서 오디오파일 전사

In [None]:
# 파이프라인 설정

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

In [None]:
# 전처리 전 common_voice 15 오디오 파일 로딩
test = DatasetDict()
test_version = "mozilla-foundation/common_voice_15_0"
test["test"] = load_dataset(test_version, "ko", split="test", use_auth_token=True)

In [None]:
# 노인발화데이터 테스트(10개)
from glob import glob
import warnings
warnings.filterwarnings('ignore')

answer = [
"저 사람이 나한테 저렇게 행동을 하는구나",
"그러고 내가 더 그 사람한테 저 사람이 모자란 게 무엇이고",
"저 사람이 원하는 게 무엇인가를 내가 생각해봐야지",
"그렇게 사람한테 베풀어주면 싸울일이 없다고 생각해",
"아무리 극에 달해서 화가 났어도 내가 좀 참고",
"옛날에 장사를 했으니까 뭐",
"그런데 앞으로는 상가도 조금 조심해서 해야 되겠더라고 보니까",
"그러니까 요즘에는 온라인이 너무 그렇지",
"뭐 온라인 쇼핑이 너무 성대 하니까",
"오프라인이 처지지 어쩔 수 없는거야"]

for file, an in zip(sorted(glob("/content/노인발화TEST/*")), answer):
  print(pipe(file)["text"])
  print(an)


In [None]:
# 파이프라인에 있는 모델로 common_voice15 test 오디오파일 전사(10개)
for i in range(10):
  result = pipe(test["test"]["audio"][i]["path"])
  print(result["text"])
  print(test["test"]["sentence"][i])

In [None]:
!pip install faster_whisper

## Fast Whisper로 테스트

In [None]:
# Fast Whisper 기본 버전 테스트
from faster_whisper import WhisperModel
from faster_whisper.feature_extractor import FeatureExtractor
from faster_whisper.tokenizer import Tokenizer
import tokenizers

model = WhisperModel("large-v3")
feature_extractor = FeatureExtractor()
tokenizer = Tokenizer(tokenizers.Tokenizer, False)

In [None]:
# 노인발화데이터 10개 테스트
from glob import glob
import warnings
warnings.filterwarnings('ignore')

answer = [
"저 사람이 나한테 저렇게 행동을 하는구나",
"그러고 내가 더 그 사람한테 저 사람이 모자란 게 무엇이고",
"저 사람이 원하는 게 무엇인가를 내가 생각해봐야지",
"그렇게 사람한테 베풀어주면 싸울일이 없다고 생각해",
"아무리 극에 달해서 화가 났어도 내가 좀 참고",
"옛날에 장사를 했으니까 뭐",
"그런데 앞으로는 상가도 조금 조심해서 해야 되겠더라고 보니까",
"그러니까 요즘에는 온라인이 너무 그렇지",
"뭐 온라인 쇼핑이 너무 성대 하니까",
"오프라인이 처지지 어쩔 수 없는거야"]

for file, an in zip(sorted(glob("/content/노인발화TEST/*")), answer):
  segments, info = model.transcribe(file)
  for segment in segments:
    print(segment.text)
  print(an)

In [None]:
# 파이프라인에 있는 모델로 common_voice15 test 오디오파일 전사(10개)
for i in range(10):
  segments, info = model.transcribe(test["test"]["audio"][i]["path"])
  for segment in segments:
    print(segment.text)
  print(test["test"]["sentence"][i])

## 다른 테스트셋으로 추가 평가


- test : 노인발화["test"]로 추가 테스트

In [None]:
!pip install datasets

In [None]:
# 노인발화 전처리된 testset 불러오기
#로드
from datasets import load_from_disk
from datasets import Dataset, DatasetDict
from transformers import WhisperProcessor

noin_test = load_from_disk("/content/test")
noin_test

In [None]:
# 노인발화["test"]
import gc
import numpy as np
from tqdm import tqdm
from torch.utils.data import DataLoader
from transformers.models.whisper.english_normalizer import BasicTextNormalizer

metric = evaluate.load("cer")

eval_dataloader = DataLoader(noin_test, batch_size=8, collate_fn=data_collator)
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language_abbr, task=task)
normalizer = BasicTextNormalizer()

predictions = []
references = []
normalized_predictions = []
normalized_references = []

model.eval()
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.cuda.amp.autocast():
        with torch.no_grad():
            generated_tokens = (
                model.generate(
                    input_features=batch["input_features"].to("cuda"),
                    forced_decoder_ids=forced_decoder_ids,
                    max_new_tokens=255,
                )
                .cpu()
                .numpy()
            )
            labels = batch["labels"].cpu().numpy()
            labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)
            decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
            decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)
            predictions.extend(decoded_preds)
            references.extend(decoded_labels)
            normalized_predictions.extend([normalizer(pred).strip() for pred in decoded_preds])
            normalized_references.extend([normalizer(label).strip() for label in decoded_labels])
        del generated_tokens, labels, batch
    gc.collect()
cer = 100 * metric.compute(predictions=predictions, references=references)
normalized_cer = 100 * metric.compute(predictions=normalized_predictions, references=normalized_references)
eval_metrics = {"eval/cer": cer, "eval/normalized_cer": normalized_cer}

print(f"{cer=}  and {normalized_cer=}  ")
print(eval_metrics)

# 최종 서비스 구현 (시각화 전 Ver)

## Fast-Wshisper 모델 로딩

In [None]:
import speech_recognition as sr
from gtts import gTTS
import os
import time
import pygame
import os
from datetime import datetime
from glob import glob
from faster_whisper import WhisperModel
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import pandas as pd

In [None]:
# 인텔 드라이버 충돌 에러
# 원래 있는데 다시 깔려고 하니까 중복 에러 뜸
# 중복돼도 OK에 True값 적용
os.environ['KMP_DUPLICATE_LIB_OK']='True'

# Fast Whisper 모델 불러오기
model = WhisperModel("large-v3")

## 질문지 작성

In [None]:
start_ment = """안녕하세요. 호석님. 서울시 중구 취업 센터 aI 상담사입니다.
시니어 취업 관련해서 간단한 사전 설문조사를 진행하고 있사오니, 잠깐 시간을 내어주시면 감사하겠습니다."""

question1 = "먼저 성함을 알려주시겠어요?"

question2 = "성별과 나이는 어떻게 되시나요?"

question3 = """건강상태를 간단히 여쭤보겠습니다.
걷는데는 지장이 없으신가요? 있으시다면 얼마만큼 불편하신가요?"""

question4 = "보고 듣는데는 지장이 없으신가요? 있으시다면 얼마만큼 불편하신가요?"

question5 = """학교는 어디까지 나오셨나요?
초등학교, 중학교, 고등학교, 대학교에 졸업한 곳으로 답변 부탁드립니다."""

question6 = "살면서 주로 무슨 일들을 하셨나요?"

question7 = "혹시 특별히 하고 싶은 일이 있으신가요?"

question8 = "배우자나 자녀분들은 어디에 살고 있나요?"

end_ment = """답변 감사드립니다. 서울시 중구 고용센터였습니다.
궁금하신 사항이 있으면 발송드린 문자에 있는 번호로 연락 부탁드립니다.
담당자 검토 후 수일 내로 다시 연락드리겠습니다. 감사합니다. """

# 질문 갯수
questions = [start_ment, question1, question2, question3, question4, question5, question6, question7, question8, end_ment]

# 답변 담을 리스트
answer = []

## 필요 메소드 작성

In [None]:

# 텍스트를 말로 바꿔줌
# param : 읽어줄 텍스트
def speak(text):
    tts = gTTS(text=text, lang='ko')
    filename='voice.mp3'
    tts.save(filename)
    pygame.init()
    pygame.mixer.music.load(filename)
    pygame.mixer.music.play()
    while pygame.mixer.music.get_busy():
        continue
    pygame.mixer.music.unload()
    os.remove(filename)


# 마이크 음성 -> 오디오 파일로 저장 (구글이 만들어준 텍스트파일도 같이 저장)
# pram : 저장할 폴더 경로(str), 사용자정보(list), 질문번호(int)
def get_audio(folder_path, user_info, i):
    r = sr.Recognizer()
    with sr.Microphone() as source:

        print("지금 말씀하세요: ")

        # 오디오 파일로 저장
        audio = r.listen(source)
        wav_file_name = folder_path + '\\' + user_info[0] + "_" + user_info[1] + "_" + str(i)

        # 중복된거 있으면 그냥 지우고 다시 생성
        if os.path.isfile(wav_file_name + ".wav"):
            os.remove(wav_file_name + ".wav")

        try:
            with open(wav_file_name + ".wav", 'bx') as f:
                f.write(audio.get_wav_data())
        except Exception as e:
            print("Exception: " + str(e))

        # 구글에서 전사해준거 텍스트로 저장 (위스퍼랑 비교해보자)
        text = ""
        try:
            txt_file_name = folder_path + '\\' + user_info[0] + "_" + user_info[1] + "_google.txt"
            text = r.recognize_google(audio, language="ko-KR")
            print("말씀하신 내용입니다 : ", text)

            # 답변 전사된 텍스트 파일로 만들어 저장하기
            with open(txt_file_name, 'a') as f:
                f.write(str(text)+"\n")
        except Exception as e:
            print("Exception: " + str(e))

    return text


# #### 폴더 만들기 ####
# param : 만들 폴더 주소(str)
def make_folder(path: str):

    try:
        os.mkdir(path)

    # 혹시 이름 중복이면 일단 다른 폴더로 만들기
    # 해당 내용 로그 파일에 기록
    except:
        os.mkdir(path + "_Error")
        print(f"중복 폴더 존재{path}")


#### 녹음별 유저 폴더 만들기 ####
# param : 유저 정보 리스트(list)
# return : 만들어진 폴더 경로(str)
def make_user_voice_folder(user_info: list):

    try :
        date = datetime.today().strftime("%Y%m%d")
        num = user_info[1]
        name = user_info[0]

        # 상위 폴더는 일단 날짜명. 없으면 만들어줌
        if os.path.exists(f"03.음성녹음\\{date}") == False:
            os.mkdir("03.음성녹음\\" + date)

        # 폴더가 없다면
        folder_path = "03.음성녹음\\" + date + "\\" + num + "_" + name
        if os.path.exists(folder_path) == False:

            # 날짜 폴더 밑에 [식별번호_이름] 폴더를 만들어줌
            make_folder(folder_path)

    # 혹시 중복일 경우 일단 다른 폴더를 만들어주고 로그 남김
    except Exception as e:
        print("Exception: " + str(e))

    return folder_path

# 위스퍼로 오디오파일 전사하기
# param : 폴더경로(str)
# return : 답변 텍스트 리스트(list)
def whisper_transcribe(folder_path):

    answer_lst = []
    for audio_file in sorted(glob(folder_path + "/*.wav")):
        segments, info = model.transcribe(audio_file)
        for segment in segments:
            answer_lst.append(segment.text)

    return answer_lst


# GPT야 요약해줘
# param : 답변 내용 리스트(list)
# return : 요약내용 리스트(list)
def get_gpt_help(answer_lst):
    # 재우 API
    GPT_API_KEY = "sk-proj-KgZChuKTiML1O6WHDO7oT3BlbkFJvJYPMkdQyzZ3v4cc9XML"

    # API 키로 LLM 객체 생성 (GPT와 연결해줌)
    # temperature : 생성된 텍스트의 다양성 조정
    # 0~2 사이
    # 높을 수록 출력을 무작위하게, 낮을 수록 출력을 더 집중되고 결정론적으로 만듦
    # model_name : 사용할 GPT 모델(버전) 정보
    # openai_api_key : API 키값
    chat = ChatOpenAI(temperature = 0.2, max_tokens = 2024, openai_api_key = GPT_API_KEY)

    system_msg = "이 글에서 '이c름, 나이, 건강상태, 최종학력, 경력사항, 희망사항, 가족사항' 으로 요약해줘"
    human_msg = ".".join(answer_lst)


    # 시스템 메세지로 원하는 답변의 형태를 지정할 수 있음
    messages = [
        SystemMessage(content=system_msg),
        HumanMessage(content=human_msg),
    ]

    # 질문하고 답받아서 잘라서 리스트로 만들기
    gpt_re_lst = str(chat.invoke(messages).content).split('\n')

    return gpt_re_lst



## 메인 서비스 코드 작성

In [None]:
# 유저 폴더 만들기 및 경로 추출
user_info = ["001234001213", "김옥자", "", ""]
folder_path = make_user_voice_folder(user_info)



for i, question in enumerate(questions):

    # 질문하기
    speak(question)

    # 첫 번째랑 마지막은 안내멘트로 답변 안받음
    if 0 < i < len(questions) - 1:

        # 답변받고 파일 저장(오디오파일, 텍스트파일)
        get_audio(folder_path, user_info, i)

# 혹시 테스트하다가 중간에 그냥 끊으면 플레이어 언로드해주기
pygame.mixer.music.unload

# 위스퍼로 해당 폴더 오디오 내용 전사하기
answer_lst = whisper_transcribe(folder_path)

# 랭체인 GPT로 요약해서 받기
gpt_re_lst = get_gpt_help(answer_lst)


# 데이터프레임으로 만들기

# 답변 내용만 가져와서 리스트로 만들기
# 검증 코드가 필요하겠지만.. 여기까지만...
# 실제로 사용한다면 DB에다가 저장
content = []
for sentence in gpt_re_lst:
    content.append(sentence.split(": ")[1])

result_df = pd.DataFrame(columns =["이름","나이","건강상태","최종학력","경력사항","희망사항","가족사항"])
result_df.loc[len(result_df)] = content


In [None]:
# 결과 저장 데이터프레임
result_df

In [None]:
# 결과 엑셀로 추출
result_df.to_excel("조사결과.xlsx", index = False)

# 최종 서비스 구현 (웹 시각화 Ver)

코랩 실행 안됩니다.
로컬에서 실행해 주세요.

In [None]:
import streamlit as st
from audio_recorder_streamlit import audio_recorder
import speech_recognition as sr
from gtts import gTTS
import os
from datetime import datetime
from glob import glob
from faster_whisper import WhisperModel
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import pandas as pd

# 마이크 음성 -> 오디오 파일로 저장 (구글이 만들어준 텍스트파일도 같이 저장)
# pram : 저장할 폴더 경로(str), 사용자정보(list), 질문번호(int)
def get_audio(folder_path, user_info, i):
    r = sr.Recognizer()
    with sr.Microphone() as source:

        print("지금 말씀하세요: ")

        # 오디오 파일로 저장
        audio = r.listen(source)
        wav_file_name = folder_path + '\\' + user_info[0] + "_" + user_info[1] + "_" + str(i)

        # 중복된거 있으면 그냥 지우고 다시 생성
        if os.path.isfile(wav_file_name + ".wav"):
            os.remove(wav_file_name + ".wav")

        try:
            with open(wav_file_name + ".wav", 'bx') as f:
                f.write(audio.get_wav_data())
        except Exception as e:
            print("Exception: " + str(e))

        # 구글에서 전사해준거 텍스트로 저장 (위스퍼랑 비교해보자)
        text = ""
        try:
            txt_file_name = folder_path + '\\' + user_info[0] + "_" + user_info[1] + "_google.txt"
            text = r.recognize_google(audio, language="ko-KR")
            print("말씀하신 내용입니다 : ", text)

            # 답변 전사된 텍스트 파일로 만들어 저장하기
            with open(txt_file_name, 'a') as f:
                f.write(str(text)+"\n")
        except Exception as e:
            print("Exception: " + str(e))

    return text


# #### 폴더 만들기 ####
# param : 만들 폴더 주소(str)
def make_folder(path: str):

    try:
        os.mkdir(path)

    # 혹시 이름 중복이면 일단 다른 폴더로 만들기
    # 해당 내용 로그 파일에 기록
    except:
        os.mkdir(path + "_Error")
        print(f"중복 폴더 존재{path}")


#### 녹음별 유저 폴더 만들기 ####
# param : 유저 정보 리스트(list)
# return : 만들어진 폴더 경로(str)
def make_user_voice_folder(user_info: list):

    try :
        date = datetime.today().strftime("%Y%m%d")
        num = user_info[1]
        name = user_info[0]

        # 상위 폴더는 일단 날짜명. 없으면 만들어줌
        if os.path.exists(f"03.음성녹음\\{date}") == False:
            os.mkdir("03.음성녹음\\" + date)

        # 폴더가 없다면
        folder_path = "03.음성녹음\\" + date + "\\" + num + "_" + name
        if os.path.exists(folder_path) == False:

            # 날짜 폴더 밑에 [식별번호_이름] 폴더를 만들어줌
            make_folder(folder_path)

    # 혹시 중복일 경우 일단 다른 폴더를 만들어주고 로그 남김
    except Exception as e:
        print("Exception: " + str(e))

    return folder_path

# 위스퍼로 오디오파일 전사하기
# param : 폴더경로(str)
# return : 답변 텍스트 리스트(list)
def whisper_transcribe(folder_path, model):

    answer_lst = []
    for audio_file in sorted(glob(folder_path + "\\*.wav")):
        segments, info = model.transcribe(audio_file)
        for segment in segments:
            answer_lst.append(segment.text)

    return answer_lst


# GPT야 요약해줘
# param : 답변 내용 리스트(list)
# return : 요약내용 리스트(list)
def get_gpt_help(answer_lst):
    # 재우 API
    GPT_API_KEY = "sk-proj-KgZChuKTiML1O6WHDO7oT3BlbkFJvJYPMkdQyzZ3v4cc9XML"

    # API 키로 LLM 객체 생성 (GPT와 연결해줌)
    # temperature : 생성된 텍스트의 다양성 조정
    # 0~2 사이
    # 높을 수록 출력을 무작위하게, 낮을 수록 출력을 더 집중되고 결정론적으로 만듦
    # model_name : 사용할 GPT 모델(버전) 정보
    # openai_api_key : API 키값
    chat = ChatOpenAI(temperature = 0.2, max_tokens = 2024, openai_api_key = GPT_API_KEY)

    system_msg = "이 글에서 '이c름, 나이, 건강상태, 최종학력, 경력사항, 희망사항, 가족사항' 으로 요약해줘"
    human_msg = ".".join(answer_lst)


    # 시스템 메세지로 원하는 답변의 형태를 지정할 수 있음
    messages = [
        SystemMessage(content=system_msg),
        HumanMessage(content=human_msg),
    ]

    # 질문하고 답받아서 잘라서 리스트로 만들기
    gpt_re_lst = str(chat.invoke(messages).content).split('\n')

    return gpt_re_lst




####################### 시각화 - 질문&답변 #########################

# streamlit run() 스톱 잘되게 하는 코드
# if st.button('Stop the app'): # 버튼 누르면
#     st.session_state['stop'] = True

if 'stop' not in st.session_state: # Cont + C 누르면
    st.session_state['stop'] = False

if st.session_state['stop']: # 브라우저 꺼지면
    st.stop()



# 메인화면
st.title('4조(FORS)')
st.header('Whisper Fine Thank you & U?')
st.title('')
st.title('')

# 질문 파일이름모음 및 iter 객체 생성
try:

    # 질문 파일이름 iter 객체 생성
    if "questions" not in st.session_state:
        st.session_state.questions = ["03.음성녹음\\Streamlit\\Question\\question" + str(i) + ".wav" for i in range(10)]
        st.session_state.iter_question = iter(st.session_state.questions)

        # 첫 질문 초기화
        st.session_state.filename = next(st.session_state.iter_question)

        # 첫 질문넘버 초기화
        st.session_state.question = "시작 안내 멘트"
        st.session_state.question_num = 0

        # 폴더 생성 여부 체크
        st.session_state.isfolder = False




    # 버튼 누르면 다음 질문으로 넘어가도록 함
    if st.button("다음질문"):

        # 다음질문으로 파일이름 변경
        st.session_state.filename = next(st.session_state.iter_question)

        # 질문 번호 변경
        st.session_state.question_num += 1

        # 마지만 안내멘트 설정
        if st.session_state.question_num == len(st.session_state.questions) - 1:
            st.session_state.question = "마지막 안내 멘트"

        elif st.session_state.question_num == 0:
            st.session_state.question = "시작 안내 멘트"

        # 질문번호 설정
        else:
            st.session_state.question = "질문 " + str(st.session_state.question_num)

    # 질문 번호 출력
    st.subheader(st.session_state.question)

    # 오디오 출력
    st.audio(st.session_state.filename, format="audio/wav")

    # 녹음기삽입
    bytes = audio_recorder(sample_rate = 16000, energy_threshold = 1.0, pause_threshold = 2, key = st.session_state.question_num)


    if bytes:
        user_info = ["001234001213", "김두한", "", ""]

        # 폴더 없으면 만들어주기
        if st.session_state.isfolder == False:
            st.session_state.folder_path = make_user_voice_folder(user_info)
            st.session_state.isfolder = True

        # 저장할 파일 이름
        wav_file_name = st.session_state.folder_path + '\\' + user_info[0] + "_" + user_info[1] + "_" + str(st.session_state.question_num)

        # 오디오 파일로 저장
        try:
            with open(wav_file_name + ".wav", 'bx') as f:
                f.write(bytes)

        except Exception as e:
            print("Exception: " + str(e))


# next 객체 마지막에 도달했을 경우
except Exception as e:
    st.text("설문조사가 모두 끝났습니다.")
    print(e)


st.title('')
st.title('')


####################### 시각화 - 위스퍼 전사 ########################

if st.button("위스퍼 텍스트 출력"):

    # 위스퍼 모델 초기화
    if "model" not in st.session_state:

        # 인텔 드라이버 충돌 에러
        # 원래 있는데 다시 깔려고 하니까 중복 에러 뜸
        # 중복돼도 OK에 True값 적용

        os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

        # Fast Whisper 모델 불러오기
        st.session_state.model = WhisperModel("large-v3")

    st.session_state.answer_lst = whisper_transcribe(st.session_state.folder_path, st.session_state.model)
    for text in st.session_state.answer_lst:
        st.text(text)


####################### 시각화 - GPT 요약 ########################

# st.session_state.answer_lst = [
#     "내 이름이 음 김옥순이여어@#!#",
#     "내는 남자제. 나이는 뭐 육십일곱인기라"
#     "나 뭐 걸을 수는 있는데 뭐냐 그 가끔은 무릎이 시려서 좀 어려울 때가 있어~",
#     "눈이야 좀 침침허지이 귀도 잘 안들려 그래도 사는데 지장은 없어~",
#     "어.. 그.. 내는 중학교뿌이 안나왔다아이가. 고등학교는 못갔데이",
#     "뭐.. 예전에는 과일도 좀 팔고.. 생선도 팔고.. 팔 수 있는거는 다 팔았다 아이가. 직접 운전해서 배달도 하고.. ",
#     "나야 뭐 아무일이나 하면 좋지.. 사람들이랑 좀 얘기도 하고 몸도 좀 움직이고 하는 일이었으면 좋겠는데..",
#     "남편은 집에서 같이 살고 있쟤. 딸래미는 시집가서 서울에서 살고 있고 아들래미는 학교 댕긴다고 저기 부산에 가 있다 아이가",
# ]


if st.button("GPT 요약"):
    st.session_state.gpt_re_lst = get_gpt_help(st.session_state.answer_lst)

    for text in st.session_state.gpt_re_lst:
        st.text(text)


####################### 시각화 - 데이터프레임 ########################

if st.button("데이터프레임"):
    content = [sentence.split(": ")[1] for sentence in st.session_state.gpt_re_lst]
    st.session_state.result_df = pd.DataFrame(columns =["이름","나이","건강상태","최종학력","경력사항","희망사항","가족사항"])
    st.session_state.result_df.loc[len(st.session_state.result_df)] = content
    st.dataframe(st.session_state.result_df)

####################### 시각화 - 엑셀 추출 ########################

if st.button("엑셀 파일 추출"):
    st.session_state.result_df.to_excel("조사결과.xlsx", index = False)