In [4]:
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification, TrainingArguments, Trainer
import torchaudio
import torch
import librosa
from datasets import Dataset
import os
import numpy as np
import pandas as pd

In [5]:
MODEL_ID = "openai/whisper-large-v3"
EMOTION_LABELS = ["happiness", "angry", "disgust", "fear", "neutral", "sadness", "surprise"]
LABEL2ID = {emo: i for i, emo in enumerate(EMOTION_LABELS)}
ID2LABEL = {i: emo for emo, i in LABEL2ID.items()}
TARGET_SR = 16000
AUDIO_DIR = 'samples'

In [6]:
labels_df_01 = pd.read_csv("4th.csv", encoding='cp949').set_index("wav_id")
labels_df_02 = pd.read_csv("5th_1st.csv", encoding='cp949').set_index("wav_id")
labels_df_03 = pd.read_csv("5th_2nd.csv", encoding='cp949').set_index("wav_id")

# 4차 + 5차_1차 + 5차_2차
df = pd.concat([labels_df_01, labels_df_02, labels_df_03])
df.head()

Unnamed: 0_level_0,발화문,상황,1번 감정,1번 감정세기,2번 감정,2번 감정세기,3번 감정,3번 감정세기,4번 감정,4번 감정세기,5번 감정,5번 감정세기,나이,성별
wav_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
5e258fd1305bcf3ad153a6a4,"어, 청소 니가 대신 해 줘!",anger,Neutral,0,Angry,1,Neutral,0,Neutral,0,Angry,1,27,male
5e258fe2305bcf3ad153a6a5,둘 다 청소 하기 싫어. 귀찮아.,anger,Neutral,0,Angry,1,Neutral,0,Neutral,0,Angry,1,27,male
5e258ff5305bcf3ad153a6a6,둘 다 하기 싫어서 화내.,anger,Angry,1,Angry,1,Neutral,0,Angry,1,Angry,1,27,male
5e25902f305bcf3ad153a6a9,그럼 방세는 어떡해.,anger,Sadness,1,Sadness,1,Sadness,1,Sadness,1,Sadness,1,27,male
5e27f90b5807b852d9e0157b,권태긴줄 알았는데 다른 사람이 생겼나보더라고.,sad,Sadness,1,Sadness,1,Sadness,1,Sadness,2,Sadness,1,32,male


In [7]:
def tag_final_emotion(row):
    emo_vals = { emo:0 for emo in EMOTION_LABELS}
    
    for i in range(1, 6):
        data_emo = row.get(f'{i}번 감정').lower()
        data_emo_val = row.get(f'{i}번 감정세기')
        if data_emo == 'neutral': emo_vals['neutral'] += 1
        else: emo_vals[data_emo] += data_emo_val
    
    # 중립이 4개 이상인 경우 중립 리턴
    if emo_vals['neutral'] > 3:
        return 'neutral'
    
    # 중립이 3개 이하면 가중 최빈값 리턴
    max_val = max(emo_vals.values())
    for emo, val in emo_vals.items():
        if max_val == val:
            return emo
    
    return 'neutral'

df['final_emotion'] = df.apply(tag_final_emotion, axis=1)


In [8]:
META_COLUMS = [
    "발화문", "상황", "1번 감정", "1번 감정세기", "2번 감정", "2번 감정세기",
    "3번 감정", "3번 감정세기", "4번 감정", "4번 감정세기", "5번 감정", "5번 감정세기",
    "나이", "성별"
]

df = df.drop(columns=META_COLUMS)
df.head()

Unnamed: 0_level_0,final_emotion
wav_id,Unnamed: 1_level_1
5e258fd1305bcf3ad153a6a4,neutral
5e258fe2305bcf3ad153a6a5,neutral
5e258ff5305bcf3ad153a6a6,angry
5e25902f305bcf3ad153a6a9,sadness
5e27f90b5807b852d9e0157b,sadness


In [9]:
df.value_counts()

final_emotion
sadness          16882
angry             8650
neutral           7030
happiness         4506
fear              3150
disgust           2708
surprise          1065
Name: count, dtype: int64

In [10]:
def under_sampling(max_count_per_class):
    df_balanced = pd.concat([
        df[df['final_emotion'] == emo].sample(n=max_count_per_class, random_state=42, replace=False)
        if len(df[df['final_emotion'] == emo]) > max_count_per_class else df[df['final_emotion'] == emo]
        for emo in EMOTION_LABELS
    ])
    return df_balanced

df_balanced = under_sampling(1000)
df_balanced.head()

Unnamed: 0_level_0,final_emotion
wav_id,Unnamed: 1_level_1
5f5cc76e2e23c7161accd04b,happiness
5f600eb454b2361621284a68,happiness
5f67f3c99e04b149046cb7bb,happiness
5f6538aef8fac448cc0a57fb,happiness
5f7866dbf8fac448cc0a63d5,happiness


In [11]:
df_balanced.value_counts()

final_emotion
angry            1000
disgust          1000
fear             1000
happiness        1000
neutral          1000
sadness          1000
surprise         1000
Name: count, dtype: int64

In [12]:
def join_path(row):
    wav_id = row.name
    path_col = os.path.join(AUDIO_DIR, wav_id+'.wav')
    if (os.path.exists(path_col)):
        return path_col
    return None

df_balanced['path'] = df_balanced.apply(join_path, axis=1)
df_balanced = df_balanced.dropna(subset=['path'])
df_balanced.head()

Unnamed: 0_level_0,final_emotion,path
wav_id,Unnamed: 1_level_1,Unnamed: 2_level_1
5f5cc76e2e23c7161accd04b,happiness,samples\5f5cc76e2e23c7161accd04b.wav
5f600eb454b2361621284a68,happiness,samples\5f600eb454b2361621284a68.wav
5f67f3c99e04b149046cb7bb,happiness,samples\5f67f3c99e04b149046cb7bb.wav
5f6538aef8fac448cc0a57fb,happiness,samples\5f6538aef8fac448cc0a57fb.wav
5f7866dbf8fac448cc0a63d5,happiness,samples\5f7866dbf8fac448cc0a63d5.wav


In [13]:
train_data = df_balanced.sample(frac=0.8, random_state=42)
train_data.head()

Unnamed: 0_level_0,final_emotion,path
wav_id,Unnamed: 1_level_1,Unnamed: 2_level_1
5f87c727111dfd48d40fe30d,surprise,samples\5f87c727111dfd48d40fe30d.wav
5e4255b29306c7039ddccbf0,disgust,samples\5e4255b29306c7039ddccbf0.wav
5fbdb2a34c55eb78bd7ceb29,disgust,samples\5fbdb2a34c55eb78bd7ceb29.wav
5fbb63f7576e9378b67acb29,happiness,samples\5fbb63f7576e9378b67acb29.wav
5f697fa89e04b149046cb954,neutral,samples\5f697fa89e04b149046cb954.wav


In [14]:
test_data = df_balanced.drop(train_data.index)

test_data.head()

Unnamed: 0_level_0,final_emotion,path
wav_id,Unnamed: 1_level_1,Unnamed: 2_level_1
5f6538aef8fac448cc0a57fb,happiness,samples\5f6538aef8fac448cc0a57fb.wav
5f7866dbf8fac448cc0a63d5,happiness,samples\5f7866dbf8fac448cc0a63d5.wav
5f68d6da9e04b149046cb8de,happiness,samples\5f68d6da9e04b149046cb8de.wav
5fbca3bb44697678c497bafe,happiness,samples\5fbca3bb44697678c497bafe.wav
5f5e1d4754b2361621284912,happiness,samples\5f5e1d4754b2361621284912.wav


In [15]:
train_data['final_emotion'].value_counts()

final_emotion
happiness    815
angry        813
surprise     800
neutral      796
disgust      794
fear         792
sadness      789
Name: count, dtype: int64

In [16]:
test_data['final_emotion'].value_counts()

final_emotion
sadness      211
fear         208
disgust      206
neutral      204
surprise     200
angry        186
happiness    185
Name: count, dtype: int64

In [17]:
X_train = Dataset.from_pandas(train_data)
X_test = Dataset.from_pandas(test_data)

## audio files

In [18]:
def read_audio(path):
    array, sampling_rate = librosa.load(path, sr=None)
    return array, sampling_rate

def convert_example(example):
    audio_path = example['path']
    array, sampling_rate = read_audio(audio_path)
    return {
        'audio': {
            'path': audio_path,
            'array': array,
            'sampling_rate': sampling_rate
        },
        'labels': LABEL2ID[example['final_emotion']]
    }

def convert_dataset(dataset):
    converted_examples = []
    for example in dataset:
        converted_example = convert_example(example)
        converted_examples.append(converted_example)
    return Dataset.from_dict(converted_examples)

converted_train = X_train.map(convert_example)
converted_test = X_test.map(convert_example)

Map: 100%|██████████| 5599/5599 [00:29<00:00, 190.09 examples/s]
Map: 100%|██████████| 1400/1400 [00:05<00:00, 261.46 examples/s]


In [19]:
converted_train, converted_test

(Dataset({
     features: ['final_emotion', 'path', 'wav_id', 'audio', 'labels'],
     num_rows: 5599
 }),
 Dataset({
     features: ['final_emotion', 'path', 'wav_id', 'audio', 'labels'],
     num_rows: 1400
 }))

In [20]:
converted_train = converted_train.remove_columns(["path", "final_emotion", "wav_id"])
converted_test = converted_test.remove_columns(["path", "final_emotion", "wav_id"])

In [21]:
converted_train, converted_test

(Dataset({
     features: ['audio', 'labels'],
     num_rows: 5599
 }),
 Dataset({
     features: ['audio', 'labels'],
     num_rows: 1400
 }))

In [22]:
converted_train.save_to_disk('/content/converted_train')
converted_test.save_to_disk('/content/converted_test')

Saving the dataset (13/13 shards): 100%|██████████| 5599/5599 [00:08<00:00, 656.14 examples/s]
Saving the dataset (4/4 shards): 100%|██████████| 1400/1400 [00:01<00:00, 722.65 examples/s]


## load model

In [23]:
model_id = "openai/whisper-tiny"

feature_extractor = AutoFeatureExtractor.from_pretrained(
    model_id, do_normalize=True,
)

In [24]:
sampling_rate = feature_extractor.sampling_rate
sampling_rate

16000

In [25]:
sample = converted_train[0]["audio"]

print(f"Mean: {np.mean(sample['array']):.3}, Variance: {np.var(sample['array']):.3}")

Mean: -2.81e-05, Variance: 7.97e-06


## preprocess

In [26]:
max_duration = 30.0

def preprocess_function(examples):
    audio_arrays = [x["array"] for x in examples["audio"]]
    inputs = feature_extractor(
        audio_arrays,
        sampling_rate=feature_extractor.sampling_rate,
        max_length=int(feature_extractor.sampling_rate * max_duration),
        truncation=True,
    )
    return inputs

In [27]:
data_encoded_train = converted_train.map(
    preprocess_function,
    remove_columns=["audio"],
    batched=True,
    batch_size=100,
    num_proc=1,
)

data_encoded_train

Map: 100%|██████████| 5599/5599 [10:24<00:00,  8.96 examples/s]


Dataset({
    features: ['labels', 'input_features'],
    num_rows: 5599
})

In [28]:
data_encoded_test = converted_test.map(
    preprocess_function,
    remove_columns=["audio"],
    batched=True,
    batch_size=100,
    num_proc=1,
)

data_encoded_test

Map: 100%|██████████| 1400/1400 [02:34<00:00,  9.04 examples/s]


Dataset({
    features: ['labels', 'input_features'],
    num_rows: 1400
})

In [29]:
num_labels = len(ID2LABEL)

## Init model

In [30]:
model = AutoModelForAudioClassification.from_pretrained(
    model_id,
    num_labels=num_labels,
    label2id=LABEL2ID,
    id2label=ID2LABEL,
)

Some weights of WhisperForAudioClassification were not initialized from the model checkpoint at openai/whisper-tiny and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## training args

In [31]:
import transformers
transformers.__version__
print(transformers.__file__)

c:\Users\SSAFY\Desktop\_fpjt\.venv\Lib\site-packages\transformers\__init__.py


In [32]:
batch_size = 1  # Updated batch size
gradient_accumulation_steps = 16  # Updated gradient accumulation steps
num_train_epochs = 5  # Updated number of epochs

training_args = TrainingArguments(
    f"speech-emotion-recognition-with-openai-whisper-large-v3",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_train_epochs,
    warmup_ratio=0.1,
    logging_steps=5,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    fp16=True,
    lr_scheduler_type="linear",
    disable_tqdm=False,

)

In [33]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-08)
lr_scheduler = torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.1, end_factor=1.0, total_iters=len(train_data) * training_args.num_train_epochs)

In [34]:
import evaluate
from IPython.display import Audio
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix

metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    labels = eval_pred.label_ids

    accuracy = accuracy_score(y_true=labels, y_pred=predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='weighted')

    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [35]:
torch.cuda.empty_cache()

In [36]:
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification, TrainingArguments, Trainer, BitsAndBytesConfig, EarlyStoppingCallback

trainer = Trainer(
    model,
    training_args,
    train_dataset=data_encoded_train,
    eval_dataset=data_encoded_test,
    tokenizer=feature_extractor,
    compute_metrics=compute_metrics,
    optimizers=(optimizer, lr_scheduler),
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

  trainer = Trainer(


In [37]:
trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,1.793,1.841297,0.265714,0.258819,0.265714,0.210689
2,1.7539,1.743399,0.315,0.317899,0.315,0.302896
3,1.6058,1.705263,0.342857,0.367289,0.342857,0.320888
4,1.1098,1.636867,0.380714,0.37744,0.380714,0.367533


Non-default generation parameters: {'max_length': 448, 'suppress_tokens': [1, 2, 7, 8, 9, 10, 14, 25, 26, 27, 28, 29, 31, 58, 59, 60, 61, 62, 63, 90, 91, 92, 93, 359, 503, 522, 542, 873, 893, 902, 918, 922, 931, 1350, 1853, 1982, 2460, 2627, 3246, 3253, 3268, 3536, 3846, 3961, 4183, 4667, 6585, 6647, 7273, 9061, 9383, 10428, 10929, 11938, 12033, 12331, 12562, 13793, 14157, 14635, 15265, 15618, 16553, 16604, 18362, 18956, 20075, 21675, 22520, 26130, 26161, 26435, 28279, 29464, 31650, 32302, 32470, 36865, 42863, 47425, 49870, 50254, 50258, 50358, 50359, 50360, 50361, 50362], 'begin_suppress_tokens': [220, 50257]}
Non-default generation parameters: {'max_length': 448, 'suppress_tokens': [1, 2, 7, 8, 9, 10, 14, 25, 26, 27, 28, 29, 31, 58, 59, 60, 61, 62, 63, 90, 91, 92, 93, 359, 503, 522, 542, 873, 893, 902, 918, 922, 931, 1350, 1853, 1982, 2460, 2627, 3246, 3253, 3268, 3536, 3846, 3961, 4183, 4667, 6585, 6647, 7273, 9061, 9383, 10428, 10929, 11938, 12033, 12331, 12562, 13793, 14157, 14635

TrainOutput(global_step=1745, training_loss=1.6234865314297826, metrics={'train_runtime': 8239.2573, 'train_samples_per_second': 3.398, 'train_steps_per_second': 0.212, 'total_flos': 3.1084884516672e+17, 'train_loss': 1.6234865314297826, 'epoch': 4.985890337560279})

## Save

In [38]:
trainer.save_model("./whisper-tiny-korean-emotion")

Non-default generation parameters: {'max_length': 448, 'suppress_tokens': [1, 2, 7, 8, 9, 10, 14, 25, 26, 27, 28, 29, 31, 58, 59, 60, 61, 62, 63, 90, 91, 92, 93, 359, 503, 522, 542, 873, 893, 902, 918, 922, 931, 1350, 1853, 1982, 2460, 2627, 3246, 3253, 3268, 3536, 3846, 3961, 4183, 4667, 6585, 6647, 7273, 9061, 9383, 10428, 10929, 11938, 12033, 12331, 12562, 13793, 14157, 14635, 15265, 15618, 16553, 16604, 18362, 18956, 20075, 21675, 22520, 26130, 26161, 26435, 28279, 29464, 31650, 32302, 32470, 36865, 42863, 47425, 49870, 50254, 50258, 50358, 50359, 50360, 50361, 50362], 'begin_suppress_tokens': [220, 50257]}


In [44]:
model_path = "./whisper-tiny-korean-emotion"
model = AutoModelForAudioClassification.from_pretrained(model_path)
extractor = AutoFeatureExtractor.from_pretrained(model_path)

In [39]:
ID2LABEL

{0: 'happiness',
 1: 'angry',
 2: 'disgust',
 3: 'fear',
 4: 'neutral',
 5: 'sadness',
 6: 'surprise'}

In [40]:
def preprocess_audio(audio_path, feature_extractor, max_duration=30.0):
    audio_array, sampling_rate = librosa.load(audio_path, sr=feature_extractor.sampling_rate)

    max_length = int(feature_extractor.sampling_rate * max_duration)
    if len(audio_array) > max_length:
        audio_array = audio_array[:max_length]
    else:
        audio_array = np.pad(audio_array, (0, max_length - len(audio_array)))

    inputs = feature_extractor(
        audio_array,
        sampling_rate=feature_extractor.sampling_rate,
        max_length=max_length,
        truncation=True,
        return_tensors="pt",
    )
    return inputs

In [41]:
def predict_emotion(audio_path, model, feature_extractor, id2label, max_duration=30.0):
    inputs = preprocess_audio(audio_path, feature_extractor, max_duration)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    inputs = {key: value.to(device) for key, value in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)

    logits = outputs.logits
    predicted_id = torch.argmax(logits, dim=-1).item()
    predicted_label = id2label[predicted_id]

    return predicted_label

In [51]:

for path in ["angry.mp3", "angry2.mp3", "hap.mp3", "sad2.mp3", "sad3.mp3", "real_happy.wav", "sad4.mp3", "sad5.mp3", "annie_happy.mp3"]:
    predicted_emotion = predict_emotion(path, model, extractor, ID2LABEL)
    print(f"file: {path} | Predicted Emotion: {predicted_emotion}")

file: angry.mp3 | Predicted Emotion: neutral
file: angry2.mp3 | Predicted Emotion: angry
file: hap.mp3 | Predicted Emotion: neutral
file: sad2.mp3 | Predicted Emotion: surprise
file: sad3.mp3 | Predicted Emotion: surprise
file: real_happy.wav | Predicted Emotion: neutral
file: sad4.mp3 | Predicted Emotion: neutral
file: sad5.mp3 | Predicted Emotion: neutral
file: annie_happy.mp3 | Predicted Emotion: angry


In [50]:
print(predict_emotion("hap2.wav",model,extractor, ID2LABEL))

neutral
