In [17]:
import math
import numpy as np
import torch
import librosa
import torch.nn as nn
from transformers import Wav2Vec2Processor
from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2Model,
    Wav2Vec2PreTrainedModel,
)

class EmotionModel(Wav2Vec2PreTrainedModel):

    def __init__(self, config):
        super().__init__(config)
        self.config = config
        self.wav2vec2 = Wav2Vec2Model(config)
        self.init_weights()

    def forward(self, input_values):
        outputs = self.wav2vec2(input_values)
        hidden_states = outputs[0]  # (batch_size, sequence_length, hidden_size)
        return hidden_states
    
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name = 'audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim'
processor = Wav2Vec2Processor.from_pretrained(model_name)
audio_embedder = EmotionModel.from_pretrained(model_name).to(device)

def process_audio(signal: np.ndarray, sampling_rate: int) -> np.ndarray:
    inputs = processor(signal, sampling_rate=sampling_rate, return_tensors="pt", padding=True)
    input_values = inputs["input_values"].to(device)
    
    with torch.no_grad():
        outputs = audio_embedder(input_values)
        embeddings = outputs
        
    return embeddings.detach().cpu().numpy()

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=10000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x
    
class PositionalEncoding_per(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

class TransformerRegressor(nn.Module):
    def __init__(self, input_size, d_model, num_layers, num_heads, dim_feedforward, dropout, num_targets):
        super().__init__()
        self.input_proj = nn.Linear(input_size, d_model)
        self.pos_enc = PositionalEncoding_per(d_model)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=num_heads,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            activation='relu'
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.head = nn.Linear(d_model, num_targets)

    def forward(self, x, lengths):
        x = self.input_proj(x)
        x = self.pos_enc(x)
        x = x.transpose(0, 1) 
        lengths_cpu = lengths.cpu()
        mask = self._generate_padding_mask(lengths_cpu, x.size(0)).to(x.device)
        x = self.encoder(x, src_key_padding_mask=mask)
        x = x.transpose(0, 1)
        pooled = []
        for i, L in enumerate(lengths_cpu):
            if L > 0:
                pooled.append(x[i, :L].mean(dim=0))
            else:
                pooled.append(torch.zeros(x.size(-1), device=x.device))
        pooled = torch.stack(pooled, dim=0)
        return self.head(pooled)

    @staticmethod
    def _generate_padding_mask(lengths, max_len):
        mask = torch.arange(max_len).expand(len(lengths), max_len) >= lengths.unsqueeze(1)
        return mask


class TransformerClassifier(nn.Module):
    def __init__(self,
                 input_dim: int,
                 d_model: int,
                 num_heads: int,
                 num_layers: int,
                 dim_feedforward: int,
                 num_classes: int,
                 dropout: float = 0.1):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_enc = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=num_heads,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            activation='relu'
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x, lengths):
        x = self.input_proj(x)                      
        x = self.pos_enc(x)

        x = x.transpose(0, 1)
        mask = self._generate_padding_mask(lengths, x.size(0))
        x = self.transformer(x, src_key_padding_mask=mask)
        x = x.transpose(0, 1)         
        pooled = torch.stack([
            x[i, :lengths[i], :].mean(dim=0) if lengths[i] > 0
            else torch.zeros(x.size(2), device=x.device)
            for i in range(x.size(0))
        ], dim=0)
        return self.classifier(self.dropout(pooled))

    @staticmethod
    def _generate_padding_mask(lengths, max_len):
        bs = lengths.size(0)
        mask = torch.arange(max_len, device=lengths.device).expand(bs, max_len)
        return mask >= lengths.unsqueeze(1)

class FusionTransformer(nn.Module):
    def __init__(self, emo_enc, per_enc, config):
        super().__init__()
        for p in emo_enc.parameters(): p.requires_grad = False
        for p in per_enc.parameters(): p.requires_grad = False

        self.emo_enc = emo_enc
        self.per_enc = per_enc

        h = config['hidden_dim']
        d = config['dropout']
        heads = config['tr_heads']

        self.emo_proj = nn.Sequential(
            nn.Linear(emo_enc.output_dim, h),
            nn.LayerNorm(h),
            nn.Dropout(d)
        )
        self.per_proj = nn.Sequential(
            nn.Linear(per_enc.output_dim, h),
            nn.LayerNorm(h),
            nn.Dropout(d)
        )

        self.mha_e2p = nn.MultiheadAttention(embed_dim=h, num_heads=heads, dropout=d, batch_first=True)
        self.mha_p2e = nn.MultiheadAttention(embed_dim=h, num_heads=heads, dropout=d, batch_first=True)

        self.emo_head = nn.Sequential(
            nn.Linear(h*2,   h),
            nn.LayerNorm(h),
            nn.SiLU(),
            nn.Dropout(d),
            nn.Linear(h, config['num_emotions'])
        )
        self.per_head = nn.Sequential(
            nn.Linear(h*2,   h),
            nn.LayerNorm(h),
            nn.SiLU(),
            nn.Dropout(d),
            nn.Linear(h, config['num_traits'])
        )

    def forward(self, emo_input=None, per_input=None):
        base_emo_logits = base_per_scores = None
        if emo_input is not None:
            x_e, len_e = emo_input
            feat_e = self.emo_enc.extract_features(x_e, len_e)  
            base_emo_logits = self.emo_enc(x_e, len_e)        
            emo_emd = self.emo_proj(feat_e)                     
        if per_input is not None:
            x_p, len_p = per_input
            feat_p = self.per_enc.extract_features(x_p, len_p)  
            base_per_scores = self.per_enc(x_p, len_p)      
            per_emd = self.per_proj(feat_p)                   

        if emo_input is not None and per_input is not None:
            attn_e2p, _ = self.mha_e2p(query=emo_emd, key=per_emd, value=per_emd)
            emo_emd = emo_emd + attn_e2p

            attn_p2e, _ = self.mha_p2e(query=per_emd, key=emo_emd, value=emo_emd)
            per_emd = per_emd + attn_p2e

            fe = emo_emd.mean(dim=1)  
            fp = per_emd.mean(dim=1)  
            cat = torch.cat([fe, fp], dim=-1)  

            emo_new = self.emo_head(cat) 
            per_new = self.per_head(cat) 

            final_emo = (emo_new + base_emo_logits) / 2
            final_per = (per_new + base_per_scores) / 2

            return {'emotion': final_emo, 'personality': final_per}

        elif emo_input is not None:
            return {'emotion': base_emo_logits}
        else:
            return {'personality': base_per_scores}


def load_pretrained_emotion_encoder(path, device):
    enc = TransformerClassifier(
        input_dim=1024,
        d_model=128,
        num_heads=4,
        num_layers=3,
        dim_feedforward=512,
        num_classes=7,
        dropout=0.2
    ).to(device)

    ck = torch.load(path, map_location=device)
    enc.load_state_dict(ck)
    enc.eval()

    enc.output_dim = enc.input_proj.out_features  

    def extract_features(x, lengths):
        h = enc.input_proj(x)           
        h = enc.pos_enc(h)          
        h = h.transpose(0, 1)          
        mask = TransformerClassifier._generate_padding_mask(lengths, h.size(0))
        h = enc.transformer(h, src_key_padding_mask=mask) 
        h = h.transpose(0, 1)          
        return h

    enc.extract_features = extract_features
    return enc


def load_pretrained_personality_encoder(path, device):
    enc = TransformerRegressor(
        input_size=1024,
        d_model=256,
        num_heads=4,
        num_layers=3,
        dim_feedforward=512,
        num_targets=5,
        dropout=0.3
    ).to(device)

    ck = torch.load(path, map_location=device)
    enc.load_state_dict(ck)
    enc.eval()

    enc.output_dim = enc.input_proj.out_features  

    def extract_features(x, lengths):
        h = enc.input_proj(x)          
        h = enc.pos_enc(h)
        h = h.transpose(0, 1)          
        mask = TransformerRegressor._generate_padding_mask(lengths.cpu(), h.size(0)).to(h.device)
        h = enc.encoder(h, src_key_padding_mask=mask) 
        h = h.transpose(0, 1)          
        return h

    enc.extract_features = extract_features
    return enc

def load_fusion_model(
    fusion_ckpt_path: str,
    emo_encoder_ckpt: str,
    per_encoder_ckpt: str,
    device: str = 'cpu'
):
    device = torch.device(device)
    emo_enc = load_pretrained_emotion_encoder(emo_encoder_ckpt, device)
    per_enc = load_pretrained_personality_encoder(per_encoder_ckpt, device)
    ckpt       = torch.load(fusion_ckpt_path, map_location=device)
    best_cfg   = ckpt['config']
    state_dict = ckpt['state_dict']
    model = FusionTransformer(emo_enc, per_enc, best_cfg).to(device)
    model.load_state_dict(state_dict)
    model.eval()
    return model, device

def run_inference(
    model: FusionTransformer,
    device: torch.device,
    embedding: np.ndarray
):
    if embedding.ndim == 3 and embedding.shape[0] == 1:
        emb = embedding[0]  
    elif embedding.ndim == 2:
        emb = embedding    

    x = torch.tensor(emb, dtype=torch.float32).unsqueeze(0).to(device) 
    lengths = torch.tensor([emb.shape[0]], dtype=torch.long).to(device)

    with torch.no_grad():
        out = model(emo_input=(x, lengths), per_input=(x, lengths))
        emo_logits = out['emotion'].cpu().numpy().squeeze(0)
        per_scores = out['personality'].cpu().numpy().squeeze(0)
        emo_probs  = torch.softmax(torch.tensor(emo_logits), dim=-1).numpy()
    return emo_probs, per_scores

def extract_embeddings(audio_path: str) -> np.ndarray:
        signal, sr = librosa.load(audio_path, sr=16000)
        emb = process_audio(signal, sr)
        return emb

if __name__ == "__main__":
    FUSION_CKPT = "best_fusion_overall_trans.pt"
    EMO_ENC_CKPT = "final_best_model_uni_trans.pt"
    PER_ENC_CKPT = "best_trans_fiv2.pt"

    model, device = load_fusion_model(
        FUSION_CKPT, EMO_ENC_CKPT, PER_ENC_CKPT, device='cpu'
    )
    
    emo_names = ['Neutral', 'Anger', 'Disgust', 'Fear', 'Happiness', 'Sadness', 'Surprise']
    pers_names = ['Openness', 'Conscientiousness', 'Extraversion', 'Agreeableness', 'Neuroticism']
    
    emb = extract_embeddings("your_audio.wav")
    emo_probs, per_scores = run_inference(model, device, emb)
    print("Emotion")
    for name, v in zip(emo_names, emo_probs):
        print(f"  {name}: {v:.4f}")
        
    print("Personality")   
    for name, v in zip(pers_names, per_scores):
        print(f"  {name}: {v:.4f}")
        


In [18]:
import math
import numpy as np
import torch
import librosa
import torch.nn as nn
from transformers import Wav2Vec2Processor
from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2Model,
    Wav2Vec2PreTrainedModel,
)

class EmotionModel(Wav2Vec2PreTrainedModel):

    def __init__(self, config):
        super().__init__(config)
        self.config = config
        self.wav2vec2 = Wav2Vec2Model(config)
        self.init_weights()

    def forward(self, input_values):
        outputs = self.wav2vec2(input_values)
        hidden_states = outputs[0]  # (batch_size, sequence_length, hidden_size)
        return hidden_states
    
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name = 'audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim'
processor = Wav2Vec2Processor.from_pretrained(model_name)
audio_embedder = EmotionModel.from_pretrained(model_name).to(device)

def process_audio(signal: np.ndarray, sampling_rate: int) -> np.ndarray:
    inputs = processor(signal, sampling_rate=sampling_rate, return_tensors="pt", padding=True)
    input_values = inputs["input_values"].to(device)
    
    with torch.no_grad():
        outputs = audio_embedder(input_values)
        embeddings = outputs
        
    return embeddings.detach().cpu().numpy()


class CustomMambaBlock(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        self.in_proj = nn.Linear(d_model, d_model)
        self.s_B = nn.Linear(d_model, d_model)
        self.s_C = nn.Linear(d_model, d_model)
        self.out_proj = nn.Linear(d_model, d_model)
        self.norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.act = nn.ReLU()
    def forward(self, x):
        x_in = x
        x = self.in_proj(x)
        x = x + self.s_B(x) + self.s_C(x)
        x = self.act(x)
        x = self.out_proj(x)
        x = self.dropout(x)
        return self.norm(x + x_in)

class CustomMambaClassifier(nn.Module):
    def __init__(self, input_size=1024, d_model=256, num_layers=2, num_classes=7, dropout=0.1):
        super().__init__()
        self.input_proj = nn.Linear(input_size, d_model)
        self.blocks = nn.ModuleList([CustomMambaBlock(d_model, dropout) for _ in range(num_layers)])
        self.fc = nn.Linear(d_model, num_classes)
    def forward(self, x, lengths):
        x = self.input_proj(x)
        for blk in self.blocks: x = blk(x)
        pooled = [x[i, :L].mean(dim=0) if L>0 else torch.zeros(x.size(-1), device=x.device)
                  for i, L in enumerate(lengths)]
        return self.fc(torch.stack(pooled, dim=0))
    

class CustomMambaRegressor(nn.Module):
    def __init__(self, input_size, d_model, num_layers, num_targets, dropout=0.1):
        super().__init__()
        self.input_proj = nn.Linear(input_size, d_model)
        self.blocks = nn.ModuleList([CustomMambaBlock(d_model, dropout) for _ in range(num_layers)])
        self.head = nn.Linear(d_model, num_targets)
    def forward(self, x, lengths):
        x = self.input_proj(x)
        for blk in self.blocks: x = blk(x)
        pooled = [x[i, :L].mean(dim=0) if L>0 else torch.zeros(x.size(-1), device=x.device)
                  for i, L in enumerate(lengths)]
        return self.head(torch.stack(pooled, dim=0))

class FusionTransformer(nn.Module):
    def __init__(self, emo_enc, per_enc, config):
        super().__init__()
       
        for p in emo_enc.parameters(): p.requires_grad = False
        for p in per_enc.parameters(): p.requires_grad = False

        self.emo_enc = emo_enc
        self.per_enc = per_enc

        h = config['hidden_dim']
        d = config['dropout']
        heads = config['tr_heads']

        
        self.emo_proj = nn.Sequential(
            nn.Linear(emo_enc.output_dim, h),
            nn.LayerNorm(h),
            nn.Dropout(d)
        )
        self.per_proj = nn.Sequential(
            nn.Linear(per_enc.output_dim, h),
            nn.LayerNorm(h),
            nn.Dropout(d)
        )

        
        self.mha_e2p = nn.MultiheadAttention(embed_dim=h, num_heads=heads, dropout=d, batch_first=True)
        self.mha_p2e = nn.MultiheadAttention(embed_dim=h, num_heads=heads, dropout=d, batch_first=True)

  
        self.emo_head = nn.Sequential(
            nn.Linear(h*2,   h),
            nn.LayerNorm(h),
            nn.SiLU(),
            nn.Dropout(d),
            nn.Linear(h, config['num_emotions'])
        )
        self.per_head = nn.Sequential(
            nn.Linear(h*2,   h),
            nn.LayerNorm(h),
            nn.SiLU(),
            nn.Dropout(d),
            nn.Linear(h, config['num_traits'])
        )

    def forward(self, emo_input=None, per_input=None):
        
        base_emo_logits = base_per_scores = None
        if emo_input is not None:
            x_e, len_e = emo_input
            feat_e = self.emo_enc.extract_features(x_e, len_e)   
            base_emo_logits = self.emo_enc(x_e, len_e)         
            emo_emd = self.emo_proj(feat_e)                    
        if per_input is not None:
            x_p, len_p = per_input
            feat_p = self.per_enc.extract_features(x_p, len_p) 
            base_per_scores = self.per_enc(x_p, len_p)          
            per_emd = self.per_proj(feat_p)                    
            
        if emo_input is not None and per_input is not None:
           
            attn_e2p, _ = self.mha_e2p(query=emo_emd, key=per_emd, value=per_emd)
            emo_emd = emo_emd + attn_e2p
          
            attn_p2e, _ = self.mha_p2e(query=per_emd, key=emo_emd, value=emo_emd)
            per_emd = per_emd + attn_p2e

            fe = emo_emd.mean(dim=1) 
            fp = per_emd.mean(dim=1) 
            cat = torch.cat([fe, fp], dim=-1) 
            
            emo_new = self.emo_head(cat)   
            per_new = self.per_head(cat)
            
            final_emo = (emo_new + base_emo_logits) / 2
            final_per = (per_new + base_per_scores) / 2

            return {'emotion': final_emo, 'personality': final_per}

        elif emo_input is not None:
            return {'emotion': base_emo_logits}
        else:
            return {'personality': base_per_scores}

def load_pretrained_emotion_encoder(path, device):
    enc = CustomMambaClassifier(input_size=1024, d_model=256, num_layers=3, num_classes=7, dropout=0.2).to(device)
    ck = torch.load(path, map_location=device)
    enc.load_state_dict(ck['model_state_dict'])
    enc.output_dim = 256

    def extract_features(x, lengths):
        h = enc.input_proj(x)               
        for blk in enc.blocks:
            h = blk(h)                       
        return h

    enc.extract_features = extract_features
    enc.eval()
    return enc

def load_pretrained_personality_encoder(path, device):
    enc = CustomMambaRegressor(input_size=1024, d_model=256, num_layers=3, num_targets=5, dropout=0.2).to(device)
    ck = torch.load(path, map_location=device)
    enc.load_state_dict(ck)
    enc.output_dim = 256

    def extract_features(x, lengths):
        h = enc.input_proj(x)               
        for blk in enc.blocks:
            h = blk(h)                       
        return h

    enc.extract_features = extract_features
    enc.eval()
    return enc


def load_fusion_model(
    fusion_ckpt_path: str,
    emo_encoder_ckpt: str,
    per_encoder_ckpt: str,
    device: str = 'cpu'
):
    device = torch.device(device)
    emo_enc = load_pretrained_emotion_encoder(emo_encoder_ckpt, device)
    per_enc = load_pretrained_personality_encoder(per_encoder_ckpt, device)
    ckpt       = torch.load(fusion_ckpt_path, map_location=device)
    best_cfg   = ckpt['config']
    state_dict = ckpt['state_dict']
    model = FusionTransformer(emo_enc, per_enc, best_cfg).to(device)
    model.load_state_dict(state_dict)
    model.eval()
    return model, device

def run_inference(
    model: FusionTransformer,
    device: torch.device,
    embedding: np.ndarray
):
    if embedding.ndim == 3 and embedding.shape[0] == 1:
        emb = embedding[0]  
    elif embedding.ndim == 2:
        emb = embedding    

    x = torch.tensor(emb, dtype=torch.float32).unsqueeze(0).to(device) 
    lengths = torch.tensor([emb.shape[0]], dtype=torch.long).to(device)

    with torch.no_grad():
        out = model(emo_input=(x, lengths), per_input=(x, lengths))
        emo_logits = out['emotion'].cpu().numpy().squeeze(0)
        per_scores = out['personality'].cpu().numpy().squeeze(0)
        emo_probs  = torch.softmax(torch.tensor(emo_logits), dim=-1).numpy()
    return emo_probs, per_scores

def extract_embeddings(audio_path: str) -> np.ndarray:
        signal, sr = librosa.load(audio_path, sr=16000)
        emb = process_audio(signal, sr)
        return emb

if __name__ == "__main__":
    FUSION_CKPT = "best_fusion_overall_mamba.pt"
    EMO_ENC_CKPT = "final_best_model_uni_mamba.pt"
    PER_ENC_CKPT = "best_mamba_regressor.pth"

    model, device = load_fusion_model(
        FUSION_CKPT, EMO_ENC_CKPT, PER_ENC_CKPT, device='cpu'
    )
    
    emo_names = ['Neutral', 'Anger', 'Disgust', 'Fear', 'Happiness', 'Sadness', 'Surprise']
    pers_names = ['Openness', 'Conscientiousness', 'Extraversion', 'Agreeableness', 'Neuroticism']
    
    emb = extract_embeddings("your_audio.wav")
    emo_probs, per_scores = run_inference(model, device, emb)
    print("Emotion")
    for name, v in zip(emo_names, emo_probs):
        print(f"  {name}: {v:.4f}")
        
    print("Personality")   
    for name, v in zip(pers_names, per_scores):
        print(f"  {name}: {v:.4f}")
        
