In [1]:
!pip install yt_dlp

Collecting yt_dlp
  Downloading yt_dlp-2025.10.14-py3-none-any.whl.metadata (175 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.9/175.9 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading yt_dlp-2025.10.14-py3-none-any.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m49.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt_dlp
Successfully installed yt_dlp-2025.10.14


In [2]:
# --------------------------
# 1 - installs, imports e diretórios (ATUALIZADO)
# --------------------------
# se rodando em notebook: descomente a linha abaixo para garantir libs
# !pip install numpy pandas matplotlib seaborn tqdm pillow librosa audioread yt-dlp ffmpeg-python scikit-learn xgboost tensorflow

import os, re, shutil, subprocess, math, random, json
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
from PIL import Image
import matplotlib.pyplot as plt

import librosa, librosa.display

import tensorflow as tf
from tensorflow.keras import layers, models, backend as K, callbacks, optimizers
from tensorflow.keras.applications import VGG16
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix, precision_score, recall_score
import xgboost as xgb
import seaborn as sns

# Fixar seeds
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

# --------------------------
# Diretórios principais (ajuste conforme seu Drive)
# --------------------------

ROOT_DIR = "/content/drive/MyDrive/M.L_2UNIDADE"   # caminho principal do projeto da 2ª unidade

# Estrutura de pastas
CSV_DIR     = os.path.join(ROOT_DIR, "csv_files")              # arquivos CSV do dataset
COOKIES_DIR = os.path.join(ROOT_DIR, "cookies")                # cookies yt-dlp (opcional)
WAV_DIR     = os.path.join(ROOT_DIR, "wav_files")              # cortes de áudio WAV (10s)
IMG_DIR     = os.path.join(ROOT_DIR, "data-files")             # espectrogramas originais
AUG_DIR     = os.path.join(ROOT_DIR, "data-files-augmented")   # espectrogramas com augmentations
MODEL_DIR   = os.path.join(ROOT_DIR, "models_unit2")           # modelos treinados (VAE, CNNs)
RESULTS_DIR = os.path.join(ROOT_DIR, "results_unit2")          # métricas, curvas ROC, matrizes de confusão

# Criar diretórios caso não existam
for d in [CSV_DIR, COOKIES_DIR, WAV_DIR, IMG_DIR, AUG_DIR, MODEL_DIR, RESULTS_DIR]:
    os.makedirs(d, exist_ok=True)

# Caminhos específicos
CSV_PATH     = os.path.join(CSV_DIR, "unbalanced_train_segments.csv")  # caminho do CSV principal
COOKIES_PATH = os.path.join(COOKIES_DIR, "www.youtube.com_cookies.txt")                 # cookies yt-dlp (se necessário)

# --------------------------
# Parâmetros principais
# --------------------------
TARGET_PER_CLASS = 250    # número de músicas por gênero
SR = 22050                # sample rate
CLIP_DURATION = 10        # duração dos clipes em segundos
IMG_WIDTH, IMG_HEIGHT = 224, 224
BATCH_SIZE = 32
VAE_EPOCHS = 60
CLASS_EPOCHS = 15
LATENT_DIM = 64
NOISE_FACTOR = 0.08
SPEC_AUG_TIME_MASKS = 2
SPEC_AUG_FREQ_MASKS = 2

RANDOM_STATE = SEED

# --------------------------
# Gêneros alvo (iguais ao código original; pode ajustar se quiser)
# --------------------------
genre_dict = {
    '/m/064t9': 'Pop_music',
    '/m/0glt670': 'Hip_hop_music',
    '/m/06by7': 'Rock_music',
    '/m/06j6l': 'Rhythm_blues',
    '/m/06cqb': 'Reggae',
    '/m/0y4f8': 'Vocal',
    '/m/07gxw': 'Techno',
}


In [3]:
# --------------------------
# Helpers utilitários
# --------------------------
def ensure_empty_dir(d):
    if os.path.exists(d):
        shutil.rmtree(d)
    os.makedirs(d, exist_ok=True)

def save_json(obj, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)

def plot_and_save_fig(fig, filename):
    fig.tight_layout()
    fig.savefig(filename, dpi=200)
    plt.close(fig)

In [4]:
# --------------------------
# 2 - carregamento do dataset (leitura do CSV e filtragem por gêneros)
# --------------------------
print("Etapa 2: lendo CSV e preparando candidatos...")
if not os.path.exists(CSV_PATH):
    raise FileNotFoundError(f"CSV não encontrado em {CSV_PATH} - verifique caminho")

data = []
with open(CSV_PATH, "r", encoding="utf-8", errors="ignore") as f:
    for line in f:
        elements = re.sub(r'["\n]', "", line).split(",")
        if len(elements) >= 4:
            url = elements[0]; start = elements[1]; end = elements[2]; labels = elements[3:]
            for label in labels:
                if label in genre_dict:
                    data.append([url, start, end, genre_dict[label]])
df = pd.DataFrame(data, columns=["url", "start_time", "end_time", "class_label"])
df.to_csv(os.path.join(ROOT_DIR, "df_candidates_unit2.csv"), index=False)
print("Total candidatos lidos:", len(df))
print(df["class_label"].value_counts())

Etapa 2: lendo CSV e preparando candidatos...
Total candidatos lidos: 51710
class_label
Techno           16811
Pop_music         8407
Rock_music        8198
Hip_hop_music     7370
Rhythm_blues      4755
Vocal             3241
Reggae            2928
Name: count, dtype: int64


In [None]:
# --------------------------
# 3 - corte dos vídeos (download e cut) — com retomada ordenada
# --------------------------
COOKIES_PATH = os.path.join(COOKIES_DIR, "www.youtube.com_cookies.txt")
print("\nEtapa 3: Download e corte dos vídeos (com retomada ordenada e verificação incremental).")

import yt_dlp
import pandas as pd
import os, subprocess

# 🔸 NÃO limpar o diretório
os.makedirs(WAV_DIR, exist_ok=True)

# 🔸 Verificar progresso anterior
progress_file = os.path.join(ROOT_DIR, "df_success_unit2.csv")
if os.path.exists(progress_file):
    df_success = pd.read_csv(progress_file)
    downloaded_ids = set(df_success["url"].astype(str))
    print(f"✅ Retomando: {len(downloaded_ids)} vídeos já baixados com sucesso.")
else:
    df_success = pd.DataFrame(columns=["url", "start_time", "end_time", "class_label"])
    downloaded_ids = set()

# 🔸 Contar quantos arquivos WAV já existem por gênero
existing_files = [f for f in os.listdir(WAV_DIR) if f.endswith(".wav")]
existing_counts = {}
for f in existing_files:
    try:
        genre_name = f.split("_", 1)[1][:-4]
        existing_counts[genre_name] = existing_counts.get(genre_name, 0) + 1
    except:
        continue

counts = {label: existing_counts.get(label, 0) for label in genre_dict.values()}
print("📊 Arquivos já existentes por gênero:", counts)

# 🔸 Determinar o último vídeo baixado com sucesso (para cada gênero)
last_positions = {}
if not df_success.empty:
    for genre in df_success["class_label"].unique():
        last_url = df_success[df_success["class_label"] == genre]["url"].iloc[-1]
        try:
            pos = df[df["url"] == last_url].index[-1]
            last_positions[genre] = pos
        except IndexError:
            pass

# 🔸 Função auxiliar: verificar se já existe WAV
def wav_exists(video_id, label):
    pattern = f"{video_id}_{label}.wav"
    return any(pattern in f for f in existing_files)

# 🔸 Baixar somente os gêneros incompletos, continuando do último baixado
downloaded_rows = []
for label, group in df.groupby("class_label"):
    current_count = counts.get(label, 0)
    if current_count >= TARGET_PER_CLASS:
        print(f"✅ {label} já completo ({current_count}/{TARGET_PER_CLASS}) - pulando.")
        continue

    # se houver posição salva, começa dali
    start_index = last_positions.get(label, group.index[0])
    group = group.loc[group.index >= start_index]

    needed = TARGET_PER_CLASS - current_count
    print(f"\n🔹 {label}: precisa baixar {needed} arquivos (continuando após índice {start_index}).")

    for idx, row in group.iterrows():
        vid = str(row["url"])
        start = float(row["start_time"])
        url = f"https://www.youtube.com/watch?v={vid}"
        final_file = os.path.join(WAV_DIR, f"{vid}_{label}.wav")

        # já baixado? pula
        if vid in downloaded_ids or wav_exists(vid, label):
            continue
        if counts[label] >= TARGET_PER_CLASS:
            print(f"🎯 {label} atingiu {TARGET_PER_CLASS}, parando.")
            break

        temp_out = os.path.join(WAV_DIR, f"temp_{vid}.%(ext)s")
        try:
            ydl_opts = {
                "format": "bestaudio/best",
                "outtmpl": temp_out,
                "quiet": True,
                "noplaylist": True
            }
            if os.path.exists(COOKIES_PATH):
                ydl_opts["cookiefile"] = COOKIES_PATH

            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([url])

            temp_files = [f for f in os.listdir(WAV_DIR) if f.startswith(f"temp_{vid}")]
            if not temp_files:
                print(f"⚠️ Nenhum arquivo temporário gerado para {vid}")
                continue

            temp_file = os.path.join(WAV_DIR, temp_files[0])
            subprocess.run([
                "ffmpeg", "-y", "-ss", str(start), "-t", str(CLIP_DURATION),
                "-i", temp_file, "-acodec", "pcm_s16le", "-ar", str(SR), "-ac", "1", final_file
            ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

            os.remove(temp_file)
            if os.path.exists(final_file):
                counts[label] += 1
                downloaded_rows.append(row)
                downloaded_ids.add(vid)

                # salvar progresso incremental
                pd.concat([df_success, pd.DataFrame([row])]).to_csv(progress_file, index=False)

                if counts[label] % 10 == 0:
                    print(f"🎵 {counts[label]} baixados para {label}")

            else:
                print(f"⚠️ WAV não gerado para {vid}")

        except Exception as e:
            print(f"❌ Erro ao baixar {vid}: {e}")
            continue

# 🔸 Salvar resumo
summary = pd.DataFrame(list(counts.items()), columns=["genre", "count"])
summary.to_csv(os.path.join(ROOT_DIR, "dataset_summary_unit2.csv"), index=False)

print("\n✅ Download finalizado.")
for g, c in counts.items():
    print(f" - {g}: {c}/{TARGET_PER_CLASS}")
print("Progresso salvo em dataset_summary_unit2.csv e df_success_unit2.csv")



Etapa 3: Download e corte dos vídeos (com retomada ordenada e verificação incremental).
✅ Retomando: 10 vídeos já baixados com sucesso.
📊 Arquivos já existentes por gênero: {'Pop_music': 250, 'Hip_hop_music': 250, 'Rock_music': 249, 'Rhythm_blues': 250, 'Reggae': 250, 'Vocal': 145, 'Techno': 250}
✅ Hip_hop_music já completo (250/250) - pulando.
✅ Pop_music já completo (250/250) - pulando.
✅ Reggae já completo (250/250) - pulando.
✅ Rhythm_blues já completo (250/250) - pulando.

🔹 Rock_music: precisa baixar 1 arquivos (continuando após índice 3).


ERROR: [youtube] -08IuAXloCI: Video unavailable


❌ Erro ao baixar -08IuAXloCI: ERROR: [youtube] -08IuAXloCI: Video unavailable


ERROR: [youtube] -5HSR5eWEDU: Video unavailable


❌ Erro ao baixar -5HSR5eWEDU: ERROR: [youtube] -5HSR5eWEDU: Video unavailable


ERROR: [youtube] -82OBRkQskQ: Video unavailable. This video is private


❌ Erro ao baixar -82OBRkQskQ: ERROR: [youtube] -82OBRkQskQ: Video unavailable. This video is private


ERROR: [youtube] -LTES9d6dZY: Video unavailable


❌ Erro ao baixar -LTES9d6dZY: ERROR: [youtube] -LTES9d6dZY: Video unavailable


ERROR: [youtube] -TdKFIt-tlY: Video unavailable


❌ Erro ao baixar -TdKFIt-tlY: ERROR: [youtube] -TdKFIt-tlY: Video unavailable


ERROR: [youtube] -XCNxDxsAL0: Video unavailable. This video is no longer available due to a copyright claim by Rock & Roll Hall of Fame


❌ Erro ao baixar -XCNxDxsAL0: ERROR: [youtube] -XCNxDxsAL0: Video unavailable. This video is no longer available due to a copyright claim by Rock & Roll Hall of Fame


ERROR: [youtube] -aA0j_NIJcg: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar -aA0j_NIJcg: ERROR: [youtube] -aA0j_NIJcg: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


ERROR: [youtube] -fTW98nVe1g: Video unavailable. This video contains content from SME, who has blocked it in your country on copyright grounds


❌ Erro ao baixar -fTW98nVe1g: ERROR: [youtube] -fTW98nVe1g: Video unavailable. This video contains content from SME, who has blocked it in your country on copyright grounds


ERROR: [youtube] -hrpU1nxsbc: Video unavailable


❌ Erro ao baixar -hrpU1nxsbc: ERROR: [youtube] -hrpU1nxsbc: Video unavailable


ERROR: [youtube] -lAVvperqpQ: Video unavailable


❌ Erro ao baixar -lAVvperqpQ: ERROR: [youtube] -lAVvperqpQ: Video unavailable


ERROR: [youtube] -pRIE4KkY1M: Video unavailable. This video is no longer available due to a copyright claim by Coda Publishing Ltd


❌ Erro ao baixar -pRIE4KkY1M: ERROR: [youtube] -pRIE4KkY1M: Video unavailable. This video is no longer available due to a copyright claim by Coda Publishing Ltd


ERROR: [youtube] 061qiXuzrQs: Video unavailable. This video is no longer available due to a copyright claim by Studio Hamburg


❌ Erro ao baixar 061qiXuzrQs: ERROR: [youtube] 061qiXuzrQs: Video unavailable. This video is no longer available due to a copyright claim by Studio Hamburg


ERROR: [youtube] 0AQvRSZ8ZNo: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar 0AQvRSZ8ZNo: ERROR: [youtube] 0AQvRSZ8ZNo: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


ERROR: [youtube] 0H3VllGZQEU: Video unavailable


❌ Erro ao baixar 0H3VllGZQEU: ERROR: [youtube] 0H3VllGZQEU: Video unavailable


ERROR: [youtube] 0Ma-YlsySuU: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar 0Ma-YlsySuU: ERROR: [youtube] 0Ma-YlsySuU: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


ERROR: [youtube] 0YRM3tkIXfA: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar 0YRM3tkIXfA: ERROR: [youtube] 0YRM3tkIXfA: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


ERROR: [youtube] 0q_EX7TiuV8: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar 0q_EX7TiuV8: ERROR: [youtube] 0q_EX7TiuV8: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


ERROR: [youtube] 0sMqbNNwzvU: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar 0sMqbNNwzvU: ERROR: [youtube] 0sMqbNNwzvU: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


ERROR: [youtube] 0sqcvMHyQ1k: Video unavailable


❌ Erro ao baixar 0sqcvMHyQ1k: ERROR: [youtube] 0sqcvMHyQ1k: Video unavailable


ERROR: [youtube] 0z7EpFwegJM: Video unavailable. This video is private


❌ Erro ao baixar 0z7EpFwegJM: ERROR: [youtube] 0z7EpFwegJM: Video unavailable. This video is private


ERROR: [youtube] 1-Oahk8L_Og: Video unavailable


❌ Erro ao baixar 1-Oahk8L_Og: ERROR: [youtube] 1-Oahk8L_Og: Video unavailable


ERROR: [youtube] 1A8piNIeg6w: Video unavailable


❌ Erro ao baixar 1A8piNIeg6w: ERROR: [youtube] 1A8piNIeg6w: Video unavailable
🎵 250 baixados para Rock_music
🎯 Rock_music atingiu 250, parando.
✅ Techno já completo (250/250) - pulando.

🔹 Vocal: precisa baixar 105 arquivos (continuando após índice 44967).


ERROR: [youtube] nI59vkjjZY0: Video unavailable. This video is private


❌ Erro ao baixar nI59vkjjZY0: ERROR: [youtube] nI59vkjjZY0: Video unavailable. This video is private


ERROR: [youtube] nIA8YdQGM1Y: Video unavailable. This video is private


❌ Erro ao baixar nIA8YdQGM1Y: ERROR: [youtube] nIA8YdQGM1Y: Video unavailable. This video is private
🎵 150 baixados para Vocal
🎵 160 baixados para Vocal
🎵 170 baixados para Vocal


ERROR: [youtube] nf7AArmXhys: Video unavailable. This video is private


❌ Erro ao baixar nf7AArmXhys: ERROR: [youtube] nf7AArmXhys: Video unavailable. This video is private


ERROR: [youtube] npUieylloJE: Video unavailable


❌ Erro ao baixar npUieylloJE: ERROR: [youtube] npUieylloJE: Video unavailable


ERROR: [youtube] nqO5FTTuCvA: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar nqO5FTTuCvA: ERROR: [youtube] nqO5FTTuCvA: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.
🎵 180 baixados para Vocal


ERROR: [youtube] nzldgEJxVds: Video unavailable


❌ Erro ao baixar nzldgEJxVds: ERROR: [youtube] nzldgEJxVds: Video unavailable


ERROR: [youtube] o8Kc4i9YuZ4: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.


❌ Erro ao baixar o8Kc4i9YuZ4: ERROR: [youtube] o8Kc4i9YuZ4: Video unavailable. This video is no longer available because the YouTube account associated with this video has been terminated.
🎵 190 baixados para Vocal


ERROR: [youtube] oIyj9aDz74A: Video unavailable. This video is private


❌ Erro ao baixar oIyj9aDz74A: ERROR: [youtube] oIyj9aDz74A: Video unavailable. This video is private
🎵 200 baixados para Vocal


ERROR: [youtube] oiWZWUQx_nY: Video unavailable. This video is private


❌ Erro ao baixar oiWZWUQx_nY: ERROR: [youtube] oiWZWUQx_nY: Video unavailable. This video is private


ERROR: [youtube] ojVK0Tf2Udk: Video unavailable


❌ Erro ao baixar ojVK0Tf2Udk: ERROR: [youtube] ojVK0Tf2Udk: Video unavailable
🎵 210 baixados para Vocal


ERROR: [youtube] olxNrhVurkI: Video unavailable. This video is private


❌ Erro ao baixar olxNrhVurkI: ERROR: [youtube] olxNrhVurkI: Video unavailable. This video is private
🎵 220 baixados para Vocal


In [None]:
# --------------------------
# 4 - gerar os espectrogramas (MEL)
# --------------------------
print("\nEtapa 4: gerando espectrogramas MEL (224x224)...")
ensure_empty_dir(IMG_DIR)
os.makedirs(IMG_DIR, exist_ok=True)

wav_files = [f for f in os.listdir(WAV_DIR) if f.endswith(".wav")]
for f in tqdm(wav_files):
    try:
        class_name = f.split("_",1)[1][:-4]
        class_dir = os.path.join(IMG_DIR, class_name)
        os.makedirs(class_dir, exist_ok=True)
        y, sr = librosa.load(os.path.join(WAV_DIR, f), sr=SR)
        # pre-emphasis
        y = np.append(y[0], y[1:] - 0.97*y[:-1])
        M = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048, hop_length=512, n_mels=128, fmax=sr/2)
        log_power = librosa.power_to_db(M, ref=np.max)
        # plot
        plt.figure(figsize=(3,3))
        plt.axis('off')
        librosa.display.specshow(log_power, sr=sr, hop_length=512, cmap='jet')
        savepath = os.path.join(class_dir, f[:-4] + ".jpg")
        plt.savefig(savepath, bbox_inches='tight', pad_inches=0)
        plt.close()
    except Exception as e:
        print("Erro ao processar", f, e)

# contar imagens por genero e salvar
counts_img = {}
for g in os.listdir(IMG_DIR):
    gp = os.path.join(IMG_DIR, g)
    if os.path.isdir(gp):
        counts_img[g] = len([x for x in os.listdir(gp) if x.lower().endswith(".jpg")])
pd.DataFrame(list(counts_img.items()), columns=["genre","images"]).to_csv(os.path.join(ROOT_DIR, "image_counts_unit2.csv"), index=False)
print("Contagem de espectrogramas salva em image_counts_unit2.csv")

In [None]:
# --------------------------
# 5 - criar e treinar o autoencoder (VAE conv) - novidade
# --------------------------
print("\nEtapa 5: construindo e treinando VAE conv (denoising VAE).")

# carregando imagens de treino (tudo por enquanto, separaremos depois)
def load_images_by_class(img_root, resize=(IMG_WIDTH, IMG_HEIGHT), max_per_class=None):
    X, y, paths = [], [], []
    for cls in sorted(os.listdir(img_root)):
        cls_dir = os.path.join(img_root, cls)
        if not os.path.isdir(cls_dir): continue
        files = sorted([f for f in os.listdir(cls_dir) if f.lower().endswith(".jpg")])
        if max_per_class:
            files = files[:max_per_class]
        for f in files:
            img = Image.open(os.path.join(cls_dir,f)).convert("RGB").resize(resize)
            arr = np.array(img, dtype=np.float32)/255.0
            X.append(arr)
            y.append(cls)
            paths.append(os.path.join(cls, f))
    return np.stack(X), np.array(y), paths

# Carrega tudo (atenção memória)
X_all, y_all, paths_all = load_images_by_class(IMG_DIR, max_per_class=None)
print("Total imagens espectrograma:", X_all.shape)

# separar treino/val/test (estratificado por genero)
train_X, test_X, train_y, test_y, train_paths, test_paths = train_test_split(
    X_all, y_all, paths_all, test_size=0.2, stratify=y_all, random_state=RANDOM_STATE)
train_X, val_X, train_y, val_y, train_paths, val_paths = train_test_split(
    train_X, train_y, train_paths, test_size=0.2, stratify=train_y, random_state=RANDOM_STATE)

print("Splits -> train:", train_X.shape[0], "val:", val_X.shape[0], "test:", test_X.shape[0])

# VAE model (encoder + sampling + decoder)
input_shape = (IMG_WIDTH, IMG_HEIGHT, 3)
inputs = layers.Input(shape=input_shape)

# encoder
x = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
x = layers.MaxPooling2D(2, padding='same')(x)           # 112x112
x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)
x = layers.MaxPooling2D(2, padding='same')(x)           # 56x56
x = layers.Conv2D(128, 3, activation='relu', padding='same')(x)
x = layers.MaxPooling2D(2, padding='same')(x)           # 28x28
shape_before_flatten = K.int_shape(x)[1:]               # (28,28,128)
flat = layers.Flatten()(x)
h = layers.Dense(512, activation='relu')(flat)

z_mean = layers.Dense(LATENT_DIM, name='z_mean')(h)
z_log_var = layers.Dense(LATENT_DIM, name='z_log_var')(h)

def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], LATENT_DIM), mean=0., stddev=1.0)
    return z_mean + K.exp(0.5*z_log_var) * epsilon

z = layers.Lambda(sampling, output_shape=(LATENT_DIM,))([z_mean, z_log_var])

# decoder
decoder_input = layers.Input(shape=(LATENT_DIM,))
d = layers.Dense(np.prod(shape_before_flatten), activation='relu')(decoder_input)
d = layers.Reshape(shape_before_flatten)(d)
d = layers.Conv2D(128, 3, activation='relu', padding='same')(d)
d = layers.UpSampling2D(2)(d)  # 56
d = layers.Conv2D(64, 3, activation='relu', padding='same')(d)
d = layers.UpSampling2D(2)(d)  # 112
d = layers.Conv2D(32, 3, activation='relu', padding='same')(d)
d = layers.UpSampling2D(2)(d)  # 224
d = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(d)
decoder = models.Model(decoder_input, d, name='decoder_model')

# full VAE model
decoded = decoder(z)
vae = models.Model(inputs, decoded, name='vae_conv')

# VAE loss: reconstruction + KL
reconstruction_loss = tf.keras.losses.MeanSquaredError()(K.flatten(inputs), K.flatten(decoded))
kl_loss = -0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var))
vae_loss = reconstruction_loss + 1e-4 * kl_loss   # peso KLD pequeno, ajustar se necessário
vae.add_loss(vae_loss)
vae.compile(optimizer=optimizers.Adam(1e-4))
vae.summary()
decoder.summary()

# treinar como denoising: adiciona ruído leve nas entradas
def add_noise(x, factor=NOISE_FACTOR):
    noisy = x + factor * np.random.normal(loc=0.0, scale=1.0, size=x.shape)
    return np.clip(noisy, 0., 1.)

es = callbacks.EarlyStopping(patience=8, restore_best_weights=True)
history_vae = vae.fit(
    add_noise(train_X, NOISE_FACTOR), train_X,
    epochs=VAE_EPOCHS,
    batch_size=BATCH_SIZE,
    shuffle=True,
    validation_data=(add_noise(val_X, NOISE_FACTOR), val_X),
    callbacks=[es]
)

# salvar VAE/decoder
vae.save(os.path.join(MODEL_DIR, "vae_unit2.h5"))
decoder.save(os.path.join(MODEL_DIR, "vae_decoder_unit2.h5"))

# salvar curvas de treino
plt.figure()
plt.plot(history_vae.history.get('loss', []), label='loss')
plt.plot(history_vae.history.get('val_loss', []), label='val_loss')
plt.legend(); plt.title("VAE loss")
plt.savefig(os.path.join(REPORT_DIR, "vae_loss.png"))
plt.close()

In [None]:
# --------------------------
# 6 - Criar augmentações (novas abordagens)
# --------------------------
print("\nEtapa 6: criando augmentações (reconstruções VAE, interpolação latente, SpecAugment, spectral mix)")

ensure_empty_dir(AUG_DIR)
for g in sorted(os.listdir(IMG_DIR)):
    os.makedirs(os.path.join(AUG_DIR, g), exist_ok=True)

# 6A: Reconstruções ruidosas (VAE reconstruction of noisy input)
def reconstruct_with_vae(X, batch=32):
    recs = vae.predict(X, batch_size=batch)
    return recs

# gerar reconstruções para cada imagem de treino (uma por amostra)
for i, p in enumerate(tqdm(train_paths)):
    cls = train_y[i]
    # carregar imagem
    img = Image.open(os.path.join(IMG_DIR, p)).convert("RGB").resize((IMG_WIDTH, IMG_HEIGHT))
    arr = np.array(img, dtype=np.float32)/255.0
    noisy = add_noise(np.expand_dims(arr,0), NOISE_FACTOR)
    rec = vae.predict(noisy)[0]
    outname = os.path.splitext(os.path.basename(p))[0] + "_vae_rec.jpg"
    Image.fromarray((rec*255).astype(np.uint8)).save(os.path.join(AUG_DIR, cls, outname))

# 6B: Interpolação no espaço latente (entre pares da mesma classe e entre classes similares)
N_INTERP_PER_CLASS = 2
# Criar encoder model (input image -> z_mean) para obter médias latentes para interpolação
encoder_model = models.Model(inputs, z_mean)  # usar z_mean (determinístico) para interpolar de forma estável

for cls in sorted(set(train_y)):
    class_indices = [i for i,lab in enumerate(train_y) if lab==cls]
    if len(class_indices) < 2: continue
    chosen = np.random.choice(class_indices, size=min(len(class_indices), 10), replace=False)
    for k in range(N_INTERP_PER_CLASS):
        a,b = np.random.choice(chosen, size=2, replace=False)
        img_a = np.array(Image.open(os.path.join(IMG_DIR, train_paths[a])).convert("RGB").resize((IMG_WIDTH,IMG_HEIGHT)), dtype=np.float32)/255.0
        img_b = np.array(Image.open(os.path.join(IMG_DIR, train_paths[b])).convert("RGB").resize((IMG_WIDTH,IMG_HEIGHT)), dtype=np.float32)/255.0
        za = encoder_model.predict(img_a[np.newaxis,...])[0]
        zb = encoder_model.predict(img_b[np.newaxis,...])[0]
        alpha = np.random.uniform(0.25, 0.75)
        zint = alpha * za + (1-alpha) * zb
        rec = decoder.predict(zint[np.newaxis,...])[0]
        outname = f"interp_{k}_{os.path.basename(train_paths[a])}_{os.path.basename(train_paths[b])}.jpg"
        Image.fromarray((np.clip(rec,0,1)*255).astype(np.uint8)).save(os.path.join(AUG_DIR, cls, outname))

# 6C: SpecAugment (aplicado diretamente nos espectrogramas antes de salvar)
def spec_augment_image(img_arr, time_masks=SPEC_AUG_TIME_MASKS, freq_masks=SPEC_AUG_FREQ_MASKS, max_time_mask_pct=0.2, max_freq_mask_pct=0.15):
    # img_arr in [0,1], shape (H,W,C) but SpecAugment manipulates spectral axis (H=n_mels) and time axis (W)
    img = img_arr.copy()
    H, W = img.shape[0], img.shape[1]
    for _ in range(time_masks):
        t = int(np.random.uniform(0.0, max_time_mask_pct) * W)
        t0 = np.random.randint(0, max(1, W - t + 1))
        img[:, t0:t0+t, :] = 0
    for _ in range(freq_masks):
        f = int(np.random.uniform(0.0, max_freq_mask_pct) * H)
        f0 = np.random.randint(0, max(1, H - f + 1))
        img[f0:f0+f, :, :] = 0
    return img

# Aplicar SpecAugment a algumas imagens de treino e salvar
N_SPEC_AUG_PER_CLASS = 2
for cls in sorted(set(train_y)):
    class_indices = [i for i,lab in enumerate(train_y) if lab==cls]
    chosen = np.random.choice(class_indices, size=min(len(class_indices), 20), replace=False)
    for i_idx in chosen[:N_SPEC_AUG_PER_CLASS]:
        p = train_paths[i_idx]
        arr = np.array(Image.open(os.path.join(IMG_DIR, p)).convert("RGB").resize((IMG_WIDTH, IMG_HEIGHT)), dtype=np.float32)/255.0
        aug = spec_augment_image(arr)
        outname = os.path.splitext(os.path.basename(p))[0] + "_specaug.jpg"
        Image.fromarray((np.clip(aug,0,1)*255).astype(np.uint8)).save(os.path.join(AUG_DIR, cls, outname))

# 6D: Spectral Mix (linear mix of two spectrogram images) - naive mix (hard label use original or duplicate)
N_MIX_PER_CLASS = 2
for cls in sorted(set(train_y)):
    class_indices = [i for i,lab in enumerate(train_y) if lab==cls]
    if len(class_indices) < 2: continue
    chosen = np.random.choice(class_indices, size=min(len(class_indices), 20), replace=False)
    for k in range(N_MIX_PER_CLASS):
        a,b = np.random.choice(chosen, size=2, replace=False)
        arr_a = np.array(Image.open(os.path.join(IMG_DIR, train_paths[a])).convert("RGB").resize((IMG_WIDTH,IMG_HEIGHT)), dtype=np.float32)/255.0
        arr_b = np.array(Image.open(os.path.join(IMG_DIR, train_paths[b])).convert("RGB").resize((IMG_WIDTH,IMG_HEIGHT)), dtype=np.float32)/255.0
        alpha = np.random.uniform(0.3, 0.7)
        mixed = alpha*arr_a + (1-alpha)*arr_b
        outname = f"mix_{k}_{os.path.basename(train_paths[a])}_{os.path.basename(train_paths[b])}.jpg"
        Image.fromarray((np.clip(mixed,0,1)*255).astype(np.uint8)).save(os.path.join(AUG_DIR, cls, outname))

# 6E: finalmente, copiamos os originais para AUG_DIR (mantém originais + artificially generated)
for p, y in zip(train_paths, train_y):
    src = os.path.join(IMG_DIR, p)
    dst = os.path.join(AUG_DIR, y, os.path.basename(p))
    if not os.path.exists(dst):
        shutil.copy(src, dst)

# salvar resumo de quantos arquivos por classe no AUG_DIR
aug_counts = {g: len([f for f in os.listdir(os.path.join(AUG_DIR,g)) if f.lower().endswith(".jpg")]) for g in os.listdir(AUG_DIR)}
pd.DataFrame(list(aug_counts.items()), columns=["genre","aug_images"]).to_csv(os.path.join(ROOT_DIR, "aug_image_counts_unit2.csv"), index=False)
print("Augmentation concluída. Counts saved to aug_image_counts_unit2.csv")

In [None]:
# --------------------------
# 7 - Testes: treinar 2 CNNs
#   - CNN A: VGG16 transfer learning com dados originais (baseline)
#   - CNN B: VGG16 transfer learning com dados originais + AUG_DIR
# --------------------------
print("\nEtapa 7: treinar CNNs baseline e augmented")

# Funções auxiliares para coletar arquivos
def collect_files_labels(root):
    files, labels = [], []
    for cls in sorted(os.listdir(root)):
        cls_dir = os.path.join(root, cls)
        if not os.path.isdir(cls_dir): continue
        for f in os.listdir(cls_dir):
            if f.lower().endswith(".jpg"):
                files.append(os.path.join(cls_dir, f))
                labels.append(cls)
    return files, labels

orig_files, orig_labels = collect_files_labels(IMG_DIR)
aug_files, aug_labels = collect_files_labels(AUG_DIR)

# Criar splits (para comparação justa manter o mesmo test set - usar test_paths criados anteriormente)
# We'll use test_paths list (relative paths) to define test set file paths in IMG_DIR
test_file_paths = [os.path.join(IMG_DIR, p) for p in test_paths]

# Create function to split train/val for baseline using orig_files excluding test_files
def split_files_for_training(files, labels, test_file_paths, val_frac=0.2):
    # filter out test
    files_filtered, labels_filtered = [], []
    for f, l in zip(files, labels):
        if os.path.abspath(f) in [os.path.abspath(x) for x in test_file_paths]:
            continue
        files_filtered.append(f); labels_filtered.append(l)
    tr_files, val_files, tr_labels, val_labels = train_test_split(files_filtered, labels_filtered, test_size=val_frac, stratify=labels_filtered, random_state=RANDOM_STATE)
    return tr_files, val_files, tr_labels, val_labels

train_files_base, val_files_base, train_labels_base, val_labels_base = split_files_for_training(orig_files, orig_labels, test_file_paths)
train_files_aug, val_files_aug, train_labels_aug, val_labels_aug = split_files_for_training(aug_files, aug_labels, test_file_paths)

print("Baseline train size:", len(train_files_base), "val:", len(val_files_base), "test:", len(test_file_paths))
print("Augmented train size:", len(train_files_aug), "val:", len(val_files_aug), "test:", len(test_file_paths))

# Generator Keras Sequence
from tensorflow.keras.utils import Sequence
class ImageSequence(Sequence):
    def __init__(self, files, labels, le, batch_size=BATCH_SIZE, shuffle=True, augment=False):
        self.files = files
        self.labels = np.array(labels)
        self.le = le
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.augment = augment
        self.indexes = np.arange(len(self.files))
        self.on_epoch_end()
    def __len__(self):
        return math.ceil(len(self.files)/self.batch_size)
    def __getitem__(self, idx):
        batch_idx = self.indexes[idx*self.batch_size:(idx+1)*self.batch_size]
        batch_files = [self.files[i] for i in batch_idx]
        batch_labels = self.labels[batch_idx]
        imgs = []
        for p in batch_files:
            img = Image.open(p).convert("RGB").resize((IMG_WIDTH,IMG_HEIGHT))
            arr = np.array(img, dtype=np.float32)/255.0
            if self.augment:
                # apply random small transforms: horizontal flip, small brightness jitter, spec augment occasionally
                if random.random() < 0.1:
                    arr = np.fliplr(arr)
                if random.random() < 0.2:
                    arr = np.clip(arr + 0.03 * np.random.randn(*arr.shape), 0, 1)
            imgs.append(arr)
        X = np.stack(imgs)
        y = to_categorical(self.le.transform(batch_labels), num_classes=len(self.le.classes_))
        return X, y
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)

# label encoder based on classes found
le_classes = LabelEncoder()
le_classes.fit(sorted(os.listdir(IMG_DIR)))

# Create generators
train_gen_base = ImageSequence(train_files_base, train_labels_base, le_classes, batch_size=BATCH_SIZE, shuffle=True, augment=False)
val_gen_base = ImageSequence(val_files_base, val_labels_base, le_classes, batch_size=BATCH_SIZE, shuffle=False, augment=False)
train_gen_aug = ImageSequence(train_files_aug, train_labels_aug, le_classes, batch_size=BATCH_SIZE, shuffle=True, augment=True)
val_gen_aug = ImageSequence(val_files_aug, val_labels_aug, le_classes, batch_size=BATCH_SIZE, shuffle=False, augment=False)
test_gen = ImageSequence(test_file_paths, test_y, le_classes, batch_size=BATCH_SIZE, shuffle=False, augment=False)

# Build VGG16 transfer model factory
def build_vgg_transfer(num_classes, train_base=False):
    base = VGG16(include_top=False, weights='imagenet', input_shape=(IMG_WIDTH,IMG_HEIGHT,3))
    base.trainable = train_base
    inp = layers.Input(shape=(IMG_WIDTH,IMG_HEIGHT,3))
    x = base(inp, training=False)
    x = layers.Flatten()(x)
    x = layers.Dense(512, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    model = models.Model(inp, out)
    model.compile(optimizer=optimizers.Adam(1e-5), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

num_classes = len(le_classes.classes_)
print("Num classes:", num_classes)

# Train baseline
print("Treinando VGG16 baseline (originais)...")
vgg_base = build_vgg_transfer(num_classes, train_base=False)
es = callbacks.EarlyStopping(patience=4, restore_best_weights=True)
history_base = vgg_base.fit(train_gen_base, epochs=CLASS_EPOCHS, validation_data=val_gen_base, callbacks=[es])
vgg_base.save(os.path.join(MODEL_DIR, "vgg_base_unit2.h5"))

# Evaluate baseline
def evaluate_keras_model(model, seq):
    y_true, y_pred, y_prob = [], [], []
    for Xb, yb in seq:
        probs = model.predict(Xb)
        preds = np.argmax(probs, axis=1)
        y_pred.extend(preds.tolist())
        y_prob.extend(probs.tolist())
        y_true.extend(np.argmax(yb, axis=1).tolist())
    return np.array(y_true), np.array(y_pred), np.array(y_prob)

y_true_base, y_pred_base, y_prob_base = evaluate_keras_model(vgg_base, test_gen)
acc_base = accuracy_score(y_true_base, y_pred_base)
f1_base = f1_score(y_true_base, y_pred_base, average='macro')
print("Baseline: Acc", acc_base, "F1", f1_base)

# Train augmented
print("Treinando VGG16 com dataset aumentado...")
vgg_aug = build_vgg_transfer(num_classes, train_base=False)
history_aug = vgg_aug.fit(train_gen_aug, epochs=CLASS_EPOCHS, validation_data=val_gen_aug, callbacks=[es])
vgg_aug.save(os.path.join(MODEL_DIR, "vgg_aug_unit2.h5"))

y_true_aug, y_pred_aug, y_prob_aug = evaluate_keras_model(vgg_aug, test_gen)
acc_aug = accuracy_score(y_true_aug, y_pred_aug)
f1_aug = f1_score(y_true_aug, y_pred_aug, average='macro')
print("Augmented: Acc", acc_aug, "F1", f1_aug)

In [None]:
# --------------------------
# 8 - Resultados: tabela, ROC e matrizes de confusão
# --------------------------
print("\nEtapa 8: Gerando métricas, curvas ROC e matrizes de confusão (sem PDF).")

from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Criação da tabela de métricas
y_test_bin = label_binarize(y_true_base, classes=np.arange(num_classes))
try:
    auc_base = roc_auc_score(y_test_bin, y_prob_base, average='macro')
except Exception:
    auc_base = np.nan
try:
    auc_aug = roc_auc_score(y_test_bin, y_prob_aug, average='macro')
except Exception:
    auc_aug = np.nan

metrics_df = pd.DataFrame([
    ["VGG16 (baseline)", acc_base, f1_base, auc_base],
    ["VGG16 (augmented)", acc_aug, f1_aug, auc_aug]
], columns=["Model", "Accuracy", "F1_macro", "AUC_macro"])

# Salvar tabela
metrics_path = os.path.join(RESULTS_DIR, "metrics_comparison_unit2.csv")
metrics_df.to_csv(metrics_path, index=False)
print("\n📊 Tabela de métricas:")
print(metrics_df)
print(f"\nTabela salva em: {metrics_path}")

# --------------------------
# Curvas ROC
# --------------------------
def multiclass_roc(y_true, y_prob, n_classes):
    y_bin = label_binarize(y_true, classes=np.arange(n_classes))
    fpr, tpr, roc_auc = {}, {}, {}
    for i in range(n_classes):
        try:
            fpr[i], tpr[i], _ = roc_curve(y_bin[:, i], np.array(y_prob)[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
        except:
            fpr[i], tpr[i], roc_auc[i] = None, None, None
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes) if fpr[i] is not None]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(n_classes):
        if fpr[i] is not None:
            mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= n_classes
    fpr["macro"], tpr["macro"], roc_auc["macro"] = all_fpr, mean_tpr, auc(all_fpr, mean_tpr)
    return fpr, tpr, roc_auc

fpr_b, tpr_b, roc_auc_b = multiclass_roc(y_true_base, y_prob_base, num_classes)
fpr_a, tpr_a, roc_auc_a = multiclass_roc(y_true_aug, y_prob_aug, num_classes)

plt.figure(figsize=(8,6))
plt.plot(fpr_b["macro"], tpr_b["macro"], '--', label=f'Baseline (AUC={roc_auc_b["macro"]:.3f})')
plt.plot(fpr_a["macro"], tpr_a["macro"], '-.', label=f'Augmented (AUC={roc_auc_a["macro"]:.3f})')
plt.plot([0,1],[0,1],'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve - Comparação (Macro Average)")
plt.legend()
plt.grid(alpha=0.3)
roc_path = os.path.join(RESULTS_DIR, "roc_comparison.png")
plt.savefig(roc_path, dpi=200)
plt.close()
print(f"✅ Curva ROC salva em: {roc_path}")

# --------------------------
# Matrizes de confusão
# --------------------------
cm_base = confusion_matrix(y_true_base, y_pred_base)
cm_aug  = confusion_matrix(y_true_aug, y_pred_aug)

# Baseline
plt.figure(figsize=(10,8))
sns.heatmap(cm_base, annot=True, fmt="d", cmap="Blues",
            xticklabels=le_classes.classes_, yticklabels=le_classes.classes_)
plt.title("Matriz de Confusão - CNN Base")
plt.xlabel("Predito"); plt.ylabel("Real")
cm_base_path = os.path.join(RESULTS_DIR, "cm_baseline.png")
plt.savefig(cm_base_path, dpi=200)
plt.close()

# Augmented
plt.figure(figsize=(10,8))
sns.heatmap(cm_aug, annot=True, fmt="d", cmap="Greens",
            xticklabels=le_classes.classes_, yticklabels=le_classes.classes_)
plt.title("Matriz de Confusão - CNN com Augmentation")
plt.xlabel("Predito"); plt.ylabel("Real")
cm_aug_path = os.path.join(RESULTS_DIR, "cm_augmented.png")
plt.savefig(cm_aug_path, dpi=200)
plt.close()

print(f"✅ Matrizes salvas em:\n - {cm_base_path}\n - {cm_aug_path}")

print("\n🎯 Resultados finais concluídos! Verifique os arquivos na pasta 'results_unit2'")
