In [24]:
### DenseNet121 N_FFT 256 - 23 EPOCHS - 0.96 ACC ON 20% TEST
import os, math
import numpy as np
seed = 2018
np.random.seed(seed)

import librosa
from scipy import signal

from matplotlib import pyplot as plt

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

from torchvision.models import densenet121
import torch
import torch.nn as nn; import torch.nn.functional as F
import torch.optim as optim; from torch.optim import lr_scheduler
import torchaudio
from torch.utils.data import Dataset, DataLoader
#from keras.layers import Input

#from keras.callbacks import ModelCheckpoint
#from keras.callbacks import EarlyStopping
#from keras.callbacks import ReduceLROnPlateau
#from keras.callbacks import CSVLogger

#from keras import Model
#from keras import backend as K

#from keras.utils import np_utils
#from keras.preprocessing import image
 
#from keras.applications.densenet import DenseNet121


In [20]:
current_model = densenet121


model_name = 'wingbeats_' + current_model.__name__

best_weights_path = model_name + '.h5'
log_path = model_name + '.log'
monitor = 'val_acc'
batch_size = 32
epochs = 100
es_patience = 7
rlr_patience = 3

SR = 8000
N_FFT = 256
HOP_LEN = N_FFT / 6
input_shape = (129, 120, 1)
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available else 'cpu'

In [9]:
target_names = ['Ae. aegypti', 'Ae. albopictus', 'An. gambiae', 'An. arabiensis', 'C. pipiens', 'C. quinquefasciatus']

X_names = []
y = []
target_count = []

for i, target in enumerate(target_names):
    target_count.append(0)
    path = './../Wingbeats/' + target + '/'
    for [root, dirs, files] in os.walk(path, topdown = False):
        for filename in files:
            name,ext = os.path.splitext(filename)
            if ext == '.wav':
                name = os.path.join(root, filename)
                y.append(i)
                X_names.append(name)
                target_count[i]+=1
                # if target_count[i] > 20000:
                #     break
    print (target, '#recs = ', target_count[i])

print ('total #recs = ', len(y))

X_names, y = shuffle(X_names, y, random_state = seed)
X_train, X_test, y_train, y_test = train_test_split(X_names, y, stratify = y, test_size = 0.20, random_state = seed)

print('train #recs = ', len(X_train))
print('test #recs = ', len(X_test))
print('Total : ', len(X_names))

Ae. aegypti #recs =  85553
Ae. albopictus #recs =  20231
An. gambiae #recs =  49471
An. arabiensis #recs =  19297
C. pipiens #recs =  30415
C. quinquefasciatus #recs =  74599
total #recs =  279566
train #recs =  223652
test #recs =  55914
Total :  279566


In [12]:
def shift(x, wshift, hshift):
    
    original_shape = x.shape
    if len(original_shape) == 3:
        x = x.unsqueeze(0)
    
    b, c, h, w = x.shape
    
    tx = hshift * h
    ty = wshift * w
    
    theta = torch.tensor([[
        [1.0, 0.0, ty],
        [0.0, 1.0, tx]
    ]], dtype=torch.float32).repeat(b, 1, 1)
    
    grid = F.affine_grid(theta, x.size(), align_corners=False)
    x_transformed = F.grid_sample(x, grid, mode='bilinear', padding_mode='zeros', align_corners=False)
    
    if len(original_shape) == 3:
        x_transformed = x_transformed.squeeze(0)
    
    return x_transformed

def random_data_shift(data, w_limit = (-0.25, 0.25), h_limit = (-0.0, 0.0), cval = 0., u = 0.5):
    if np.random.random() < u:
        wshift = np.random.uniform(w_limit[0], w_limit[1])
        hshift = np.random.uniform(h_limit[0], h_limit[1])
        data = shift(data, wshift, hshift, cval = cval)
    return data

In [None]:
def train_generator():
    while True:
        for start in range(0, len(X_train), batch_size):
            x_batch = []
            y_batch = []
            
            end = min(start + batch_size, len(X_train))
            train_batch = X_train[start:end]
            labels_batch = y_train[start:end]
            
            for i in range(len(train_batch)):
                data, _ = librosa.load(train_batch[i], sr = SR)

                data = librosa.stft(data, n_fft = N_FFT, hop_length = HOP_LEN)
                data = librosa.amplitude_to_db(data)

                data = np.flipud(data)

                data = np.expand_dims(data, axis = -1)
                data = random_data_shift(data, w_limit = (-0.25, 0.25), h_limit = (-0.0, 0.0), cval = np.min(data), u = 1.0)

                # data = np.squeeze(data, axis = -1)
                # plt.imshow(data, cmap = 'gray')
                # plt.show()
                # data = np.expand_dims(data, axis = -1)

                x_batch.append(data)
                y_batch.append(labels_batch[i])

            x_batch = np.array(x_batch, np.float32)
            y_batch = np.array(y_batch, np.float32)
            
            y_batch = F.one_hot(y_batch, len(target_names))
            
            yield x_batch, y_batch

In [None]:
def shift(x, wshift, hshift, row_axis=0, col_axis=1, channel_axis=2, fill_mode='constant', cval=0.):
    if not isinstance(x, torch.Tensor):
        x = torch.tensor(x, dtype=torch.float32)
    
    original_shape = x.shape
    is_batch = len(original_shape) == 4
    
    # Añadir dimensión de batch si es necesario
    if len(original_shape) == 3:
        x = x.unsqueeze(0)
    
    # Obtener dimensiones (asumiendo formato PyTorch: B, C, H, W)
    b, c, h, w = x.shape
    
    # Calcular desplazamientos en píxeles (nota: en PyTorch el orden es diferente)
    tx = hshift * h  # desplazamiento vertical
    ty = wshift * w  # desplazamiento horizontal
    
    # Matriz de transformación afín 2x3 para PyTorch
    theta = torch.tensor([[
        [1.0, 0.0, ty],  # Atención: ty va primero para horizontal
        [0.0, 1.0, tx]   # tx para vertical
    ]], dtype=torch.float32).repeat(b, 1, 1)
    
    # Crear grid de transformación
    grid = F.affine_grid(theta, x.size(), align_corners=False)
    
    # Mapear modos de relleno
    padding_mode = 'zeros'
    if fill_mode == 'reflect':
        padding_mode = 'reflection'
    elif fill_mode == 'nearest':
        padding_mode = 'border'
    
    # Aplicar transformación
    x_transformed = F.grid_sample(x, grid, mode='bilinear', padding_mode=padding_mode, align_corners=False)
    
    # Manejar valor constante personalizado
    if fill_mode == 'constant' and cval != 0:
        # Detectar áreas fuera de los límites
        mask = (grid >= -1) & (grid <= 1)
        mask = mask.all(dim=-1, keepdim=True)
        mask = mask.repeat(1, 1, 1, c).permute(0, 3, 1, 2)
        x_transformed = torch.where(mask, x_transformed, torch.tensor(cval, dtype=x.dtype))
    
    # Recuperar forma original
    if not is_batch:
        x_transformed = x_transformed.squeeze(0)
    
    return x_transformed

def random_data_shift(data, w_limit=(-0.25, 0.25), h_limit=(-0.0, 0.0), cval=0., u=0.5):
    """Versión PyTorch de random_data_shift"""
    if np.random.random() < u:
        wshift = np.random.uniform(w_limit[0], w_limit[1])
        hshift = np.random.uniform(h_limit[0], h_limit[1])
        data = shift(data, wshift, hshift, cval=cval)
    return data

# Dataset personalizado para PyTorch
class AudioDataset(Dataset):
    def __init__(self, file_paths, labels, target_names, sr=22050, n_fft=2048, hop_len=512):
        self.file_paths = file_paths
        self.labels = labels
        self.target_names = target_names
        self.sr = sr
        self.n_fft = n_fft
        self.hop_len = hop_len
    
    def __len__(self):
        return len(self.file_paths)
    
    def __getitem__(self, idx):
        # Cargar audio
        data, _ = librosa.load(self.file_paths[idx], sr=self.sr)
        
        # Convertir a espectrograma
        data = librosa.stft(data, n_fft=self.n_fft, hop_length=self.hop_len)
        data = librosa.amplitude_to_db(data)
        data = np.flipud(data)  # Voltear verticalmente
        
        # Convertir a tensor PyTorch (C, H, W)
        data = torch.tensor(data, dtype=torch.float32)
        data = data.unsqueeze(0)  # Añadir canal: (1, H, W)
        
        # Aplicar aumento de datos (siempre en este caso, u=1.0)
        data = random_data_shift(data, 
                               w_limit=(-0.25, 0.25), 
                               h_limit=(-0.0, 0.0), 
                               cval=float(data.min()), 
                               u=1.0)
        
        # Etiqueta
        label = self.labels[idx]
        label_tensor = torch.tensor(label, dtype=torch.long)
        
        return data, label_tensor

# Función para crear el DataLoader
def create_loader(X_train, y_train, target_names, batch_size=32, sr=22050, n_fft=2048, hop_len=512, shuffle=True):
    dataset = AudioDataset(X_train, y_train, target_names, sr, n_fft, hop_len)
    
    # Función de collate personalizada para manejar las etiquetas categóricas
    def collate_fn(batch):
        data_list, label_list = [], []
        for data, label in batch:
            data_list.append(data)
            label_list.append(label)
        
        # Apilar datos
        data_batch = torch.stack(data_list)
        
        # Convertir etiquetas a one-hot encoding
        labels_batch = torch.stack(label_list)
        labels_one_hot = torch.nn.functional.one_hot(labels_batch, num_classes=len(target_names)).float()
        
        return data_batch, labels_one_hot
    
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=collate_fn)
    return loader

# Uso ejemplo:
# train_loader = create_train_loader(X_train, y_train, target_names, batch_size=32)

# En tu loop de entrenamiento:
# for epoch in range(epochs):
#     for x_batch, y_batch in train_loader:
#         # x_batch: (batch_size, 1, height, width)
#         # y_batch: (batch_size, num_classes) one-hot encoded
#         # ... tu código de entrenamiento ...

In [13]:
def valid_generator():
    while True:
        for start in range(0, len(X_test), batch_size):
            x_batch = []
            y_batch = []
            
            end = min(start + batch_size, len(X_test))
            test_batch = X_test[start:end]
            labels_batch = y_test[start:end]
            
            for i in range(len(test_batch)):
                data, rate = librosa.load(test_batch[i], sr = SR)

                data = librosa.stft(data, n_fft = N_FFT, hop_length = HOP_LEN)
                data = librosa.amplitude_to_db(data)

                data = np.flipud(data)

                data = np.expand_dims(data, axis = -1)

                x_batch.append(data)
                y_batch.append(labels_batch[i])

            x_batch = np.array(x_batch, np.float32)
            y_batch = np.array(y_batch, np.float32)
            
            y_batch = F.one_hot.to_categorical(y_batch, len(target_names))
            
            yield x_batch, y_batch

In [23]:
model_ft = current_model(weights=None)
model_ft.features.conv0 = nn.Conv2d(1, 64, 
                                     kernel_size=7, 
                                     stride=2, 
                                     padding=3, 
                                     bias=False)
num_ftrs = model_ft.classifier.in_features
model_ft.classifier = nn.Linear(num_ftrs, 6)
model_ft = model_ft.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
img_input = Input(shape = input_shape)

model = current_model(input_tensor = img_input, classes = len(target_names), weights = None)

model.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])  

callbacks_list = [ModelCheckpoint(monitor = monitor,
                                filepath = best_weights_path, 
                                save_best_only = True, 
                                save_weights_only = True,
                                verbose = 1), 
                    EarlyStopping(monitor = monitor,
                                patience = es_patience, 
                                verbose = 1),
                    ReduceLROnPlateau(monitor = monitor,
                                factor = 0.1, 
                                patience = rlr_patience, 
                                verbose = 1),
                    CSVLogger(filename = log_path)]

model.fit_generator(train_generator(),
    steps_per_epoch = int(math.ceil(float(len(X_train)) / float(batch_size))),
    validation_data = valid_generator(),
    validation_steps = int(math.ceil(float(len(X_test)) / float(batch_size))),
    epochs = epochs,
    callbacks = callbacks_list,
    shuffle = False)

model.load_weights(best_weights_path)

loss, acc = model.evaluate_generator(valid_generator(),
        steps = int(math.ceil(float(len(X_test)) / float(batch_size))))

#print('loss:', loss)
print('Test accuracy:', acc)