In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import whisperx
import gc
from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer, AutoConfig, Wav2Vec2Processor

import librosa
import IPython.display as ipd
import numpy as np
import pandas as pd

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)


from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


  torchaudio.set_audio_backend("soundfile")


In [2]:

class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )
        


In [3]:


device = 'cuda' if torch.cuda.is_available() else 'cpu'
# device = 'cpu'
audio = 'Recording.m4a'
batch_size = 16
compute_type = 'float16'
model_size = "large-v3"

MODEL_DIR = "nlpai-lab/KULLM3"
model = AutoModelForCausalLM.from_pretrained(MODEL_DIR, torch_dtype=torch.float16).to("cuda")
model_audio = whisperx.load_model(model_size, device, compute_type=compute_type)


tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR)
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)

Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.2.2. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../home/server/.cache/torch/whisperx-vad-segmentation.bin`


No language specified, language will be first be detected for each audio file (increases inference time).
Model was trained with pyannote.audio 0.0.1, yours is 3.1.1. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.2.1. Bad things might happen unless you revert torch to 1.x.


In [4]:

model_speech_emotion_loc = 'jungjongho/wav2vec2-xlsr-korean-speech-emotion-recognition2_data_rebalance'

config = AutoConfig.from_pretrained(model_speech_emotion_loc)
processor = Wav2Vec2Processor.from_pretrained(model_speech_emotion_loc)
sampling_rate = processor.feature_extractor.sampling_rate
model_speech = Wav2Vec2ForSpeechClassification.from_pretrained(model_speech_emotion_loc).to(device)

Some weights of the model checkpoint at jungjongho/wav2vec2-xlsr-korean-speech-emotion-recognition2_data_rebalance were not used when initializing Wav2Vec2ForSpeechClassification: ['wav2vec2.encoder.pos_conv_embed.conv.weight_g', 'wav2vec2.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing Wav2Vec2ForSpeechClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing Wav2Vec2ForSpeechClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at jungjongho/wav2vec2-xlsr-korean-speech-emotion-recognition2_data_rebalance and are newly initialized: ['wav2vec2.encoder.pos_c

In [22]:
def speech_file_to_array_fn(path, sampling_rate):
    speech_array, _sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(_sampling_rate, 16000)
    speech = resampler(speech_array).squeeze().numpy()
    return speech


def predict_emotion(path, sampling_rate):
    speech = speech_file_to_array_fn(path, sampling_rate)
    features = processor(speech, sampling_rate=sampling_rate, return_tensors="pt", padding=True)

    input_values = features.input_values.to(device)
    attention_mask = features.attention_mask.to(device)

    with torch.no_grad():
        logits = model_speech(input_values, attention_mask=attention_mask).logits

    # print(config)
    scores = F.softmax(logits, dim=1).detach().cpu().numpy()[0]
    outputs = [{"Emotion": config.id2label[i], "Score": f"{round(score * 100, 3):.1f}%"} for i, score in enumerate(scores)]
    maxidx = np.argmax(scores)
    return scores[np.argmax(scores)] * 100, config.id2label[maxidx] 

In [28]:
audio_nd = whisperx.load_audio(audio)
result = model_audio.transcribe(audio, batch_size=batch_size, language='ko')

textresult = result['segments'][0]['text'].strip()

In [29]:
score, feelings = predict_emotion('./Recording.m4a', 16000)
# feelings2 = '두려움'

In [30]:
textresult, feelings

('인간의 감정 테스트', '슬픔')

In [31]:

s=f"'{textresult}'를 말하는 화자는 {feelings}의 감정을 가지고 있어. 해당 화자에게 적절하게 답변을 해줘 (200자 이내)"

conversation = [
    {
        "role": "system",
        "content": "나는 사용자가 준 문장과 감정을 보고 이에 맞게 공감을 하며 도움이 되는 조언해주는 친한친구야.",
    },
    {
        'role': 'user', 
        'content': s
    }
]
inputs = tokenizer.apply_chat_template(
    conversation,
    tokenize=True,
    # add_generation_prompt=True,
    return_tensors='pt').to("cuda")
_ = model.generate(inputs, streamer=streamer, max_new_tokens=1000)

"그런 날도 있죠. 혼자 있기 어렵다면 내게 이야기해주세요. 때로는 마음을 펼치는 것만으로도 조금은 나아질 수 있어요."


In [17]:
tokenizer.batch_decode(_)

['<s> [INST] <<SYS>>\n나는 사용자가 준 문장과 감정을 보고 이에 맞게 공감을 하며 도움이 되는 조언해주는 심리전문가야.\n<</SYS>>\n\n\'인간의 감정 테스트\'를 말하는 화자는 슬픔의 감정을 가지고 있어. 해당 화자에게 적절하게 답변을 해줘 (200자 이내) [/INST]"그런 날도 있죠. 혼자 있기 어렵다면 내게 이야기해주세요. 때로는 마음을 펼치는 것만으로도 조금은 나아질 수 있어요."</s>']