In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os

avi_path = "/content/drive/MyDrive/avi"
if os.path.exists(avi_path):
    entries = os.listdir(avi_path)
    print("Entries in avi:", entries)
else:
    print(f"Path does not exist: {avi_path}")

In [None]:
!ls /content/drive

In [None]:
%%bash
apt-get update && apt-get install -y ffmpeg

mkdir -p /content/audio

for vid in "/content/drive/My Drive/avi"/*.avi; do
  base=$(basename "$vid" .avi)
  ffmpeg -i "$vid" \
    -vn \
    -ar 16000 \
    -ac 1 \
    -loglevel error \
    "/content/audio/${base}.wav"
done

ls -lh /content/audio | head -n 10


In [None]:
import glob
import os

avi_dir = "/content/drive/My Drive/avi"
wav_dir = "/content/audio"

avi_files = glob.glob(os.path.join(avi_dir, "*.avi"))
wav_files = glob.glob(os.path.join(wav_dir, "*.wav"))

total = len(avi_files)
converted = len(wav_files)
remaining = total - converted

print(f"Всего видео-файлов:       {total}")
print(f"Уже конвертировано в WAV: {converted}")
print(f"Осталось конвертировать:  {remaining}")


In [None]:
!pip install -q tqdm

import os, glob, subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm

AVI_DIR = "/content/drive/My Drive/avi"
WAV_DIR = "/content/audio"
os.makedirs(WAV_DIR, exist_ok=True)

avi_files = glob.glob(os.path.join(AVI_DIR, "*.avi"))
wav_bases = {
    os.path.splitext(os.path.basename(p))[0]
    for p in glob.glob(os.path.join(WAV_DIR, "*.wav"))
}
to_do = [
    p for p in avi_files
    if os.path.splitext(os.path.basename(p))[0] not in wav_bases
]
total = len(to_do)
print(f"Осталось файлов: {total} из {len(avi_files)}")

def convert_one(vid_path):
    base = os.path.splitext(os.path.basename(vid_path))[0]
    out_wav = os.path.join(WAV_DIR, f"{base}.wav")
    cmd = [
        "ffmpeg", "-y", "-i", vid_path,
        "-vn", "-ar", "16000", "-ac", "1",
        "-threads", "1", "-loglevel", "error",
        out_wav
    ]
    subprocess.run(cmd, check=False)
    return True

workers = 4
with ProcessPoolExecutor(max_workers=workers) as exe:
    futures = [exe.submit(convert_one, p) for p in to_do]
    with tqdm(total=total, ncols=80, desc="Конвертация") as pbar:
        for _ in as_completed(futures):
            pbar.update(1)

print(" Все файлы сконвертированы!")


In [None]:
import pandas as pd
import os
from pathlib import Path
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
from torchaudio.transforms import Resample, MelSpectrogram, AmplitudeToDB

csv_path = "/content/drive/My Drive/df_IEMOCAP_with_text.csv"
df = pd.read_csv(csv_path)

audio_dir = "/content/audio"

In [None]:
file_paths = []
labels     = []
label_set  = sorted(df['emotion'].unique())
label_map  = {emo: i for i, emo in enumerate(label_set)}

for _, row in df.iterrows():
    name    = row['name_files']
    session = row['session']
    emo     = row['emotion']
    wav_fn  = f"{name}.wav"
    wav_fp  = os.path.join(audio_dir, wav_fn)
    if os.path.isfile(wav_fp):
        file_paths.append(wav_fp)
        labels.append(label_map[emo])

print(f"Используем {len(file_paths)} файлов, метки: {label_map}")

In [None]:
import pandas as pd, os
from pathlib import Path

df = pd.read_csv("/content/drive/My Drive/df_IEMOCAP_with_text.csv")

target_emotions = {"ang","dis","fea","hap","sad","sur", "neu"}

df = df[df['emotion'].isin(target_emotions)].reset_index(drop=True)

label_map = {emo: idx for idx, emo in enumerate(sorted(target_emotions))}
print("Новые метки:", label_map)

audio_dir = "/content/audio"
file_paths, labels = [], []
for _, row in df.iterrows():
    wav_fn = f"{row['name_files']}.wav"
    wav_fp = os.path.join(audio_dir, wav_fn)
    if os.path.isfile(wav_fp):
        file_paths.append(wav_fp)
        labels.append(label_map[row['emotion']])

print(f"Осталось для обучения {len(file_paths)} файлов в {len(label_map)} классах")

In [None]:
from collections import Counter
from sklearn.model_selection import train_test_split
cnt = Counter(labels)
print("До фильтрации:", cnt)

min_samples = 2
good_classes = {lab for lab, c in cnt.items() if c >= min_samples}
print("Оставляем классы:", good_classes)

file_paths_f = []
labels_f     = []

for p, l in zip(file_paths, labels):
    if l in good_classes:
        file_paths_f.append(p)
        labels_f.append(l)

cnt2 = Counter(labels_f)
print("После фильтрации:", cnt2)

train_paths, val_paths, train_labels, val_labels = train_test_split(
    file_paths_f, labels_f,
    test_size=0.2,
    stratify=labels_f,
    random_state=42
)

print("Train:", len(train_paths), "Val:", len(val_paths))


In [None]:
import torch
from torch.utils.data import DataLoader
from torchaudio.transforms import Resample, MelSpectrogram, AmplitudeToDB
import torchaudio
import numpy as np
import librosa
from pathlib import Path


def extract_prosody(wav_fp, sr=16000):
    y, _ = librosa.load(wav_fp, sr=sr)
    pitches, _ = librosa.piptrack(y=y, sr=sr)
    pitch = float(np.mean(pitches[pitches>0]) or 0.0)
    energy = float(np.mean(librosa.feature.rms(y=y)))
    return np.array([pitch, energy], dtype=np.float32)

class EmotionDatasetPlus(torch.utils.data.Dataset):
    def __init__(self, paths, labels, sr=16000, n_mels=64,
                 mask_prob=0.3, mask_patch_size=(10,10)):
        self.paths = paths
        self.labels = labels
        self.mask_prob = mask_prob
        self.patch = mask_patch_size
        self.resampler = Resample(orig_freq=48000, new_freq=sr)
        self.melspec   = MelSpectrogram(sample_rate=sr, n_mels=n_mels)
        self.db        = AmplitudeToDB()

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, i):
        wav_fp = self.paths[i]
        wav, sr = torchaudio.load(wav_fp)
        if sr!=16000: wav = self.resampler(wav)
        if wav.size(0)>1: wav = wav.mean(0,keepdim=True)
        mel = self.melspec(wav)
        mel_db = self.db(mel).squeeze(0)

        M, T = mel_db.shape
        mask = torch.ones_like(mel_db)
        n_patches = int(self.mask_prob * (M*T)/(self.patch[0]*self.patch[1]))
        for _ in range(n_patches):
            t0 = np.random.randint(0, T-self.patch[0])
            m0 = np.random.randint(0, M-self.patch[1])
            mask[m0:m0+self.patch[1], t0:t0+self.patch[0]] = 0
        mel_db = mel_db * mask

        pros = torch.from_numpy(extract_prosody(wav_fp)).float()
        return mel_db, pros, self.labels[i]

def collate_fn_plus(batch):
    specs, prosody, labs = zip(*batch)
    max_t = max(s.shape[-1] for s in specs)
    specs_padded = [torch.nn.functional.pad(s, (0, max_t-s.shape[-1])) for s in specs]
    return torch.stack(specs_padded), torch.stack([torch.tensor(p) for p in prosody]), torch.tensor(labs)


train_ds = EmotionDatasetPlus(train_paths, train_labels, mask_prob=0.3)
val_ds   = EmotionDatasetPlus(val_paths,   val_labels,   mask_prob=0.3)

train_loader = DataLoader(
    train_ds,
    batch_size=8,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn_plus
)
val_loader = DataLoader(
    val_ds,
    batch_size=8,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn_plus
)

batch_mel, batch_pros, batch_lbl = next(iter(train_loader))
print("mel:", batch_mel.shape, "prosody:", batch_pros.shape, "labels:", batch_lbl.shape)


In [None]:
from peft import LoraConfig, get_peft_model
from transformers import AutoModel

backbone = AutoModel.from_pretrained("microsoft/wavlm-base-plus")

lora_cfg = LoraConfig(
    r=8,
    lora_alpha=16,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.1,
    bias="none",
    task_type="SEQ_CLS"
)

backbone = get_peft_model(backbone, lora_cfg)

class EmotionModel(nn.Module):
    def __init__(self, backbone, n_classes, prosody_dim=2):
        super().__init__()
        self.backbone = backbone
        H = backbone.config.hidden_size
        self.head = nn.Sequential(
            nn.Linear(H + prosody_dim, H),
            nn.GELU(),
            nn.Linear(H, n_classes)
        )

    def forward(self, mel, pros):
        out  = self.backbone(mel).last_hidden_state
        feat = out.mean(dim=1)
        x    = torch.cat([feat, pros], dim=1)
        return self.head(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmotionModel(backbone, n_classes=6, prosody_dim=2).to(device)

In [None]:
import torch
import torch.nn.functional as F
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score


def evaluate(loader):
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for mel, pros, y in loader:
            mel, pros, y = mel.to(device), pros.to(device), y.to(device)
            logits = model(mel, pros)
            p = logits.argmax(dim=1)
            preds += p.cpu().tolist()
            trues += y.cpu().tolist()
    return accuracy_score(trues, preds), f1_score(trues, preds, average="macro")

epochs = 5
for ep in range(1, epochs+1):
    p = 0.3 + 0.4*(ep-1)/(epochs-1)
    train_ds.mask_prob = p
    val_ds.mask_prob   = p

    model.train()
    loop = tqdm(train_loader, desc=f"Train Ep{ep} (mask={p:.2f})")
    for mel, pros, y in loop:
        mel, pros, y = mel.to(device), pros.to(device), y.to(device)
        logits = model(mel, pros)
        loss = F.cross_entropy(logits, y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        loop.set_postfix(loss=f"{loss.item():.3f}")

    acc, f1 = evaluate(val_loader)
    print(f"→ Ep{ep}: Val Acc={acc:.3f}, Macro-F1={f1:.3f}\n")


In [None]:
!zip -r audio.zip /content/audio



In [None]:
from google.colab import files
files.download("audio.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>