In [123]:
import os
import pathlib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import IPython.display as ipd
from pathlib import Path
import obspy
from obspy.core import read

Establece la ruta del dataset

In [124]:
ruta = "/Users/andavaro/Desktop/Andrès/UBA/TrabajoDeGradoCEIA/VENV_Python/Tesis_IA/Dataset/"

In [125]:
data_dir = pathlib.Path(ruta)
os.chdir(ruta)
default_dir = os.getcwd()
print(f'Data directory will be: {ruta}')

Data directory will be: /Users/andavaro/Desktop/Andrès/UBA/TrabajoDeGradoCEIA/VENV_Python/Tesis_IA/Dataset/


Importa los nombres de las carpetas que contiene el dataset

In [126]:
labels = [name for name in os.listdir('.') if os.path.isdir(name)]
# back to default directory
os.chdir(default_dir)
print(f'Total Labels: {len(labels)}')
print(f'Label Names: {labels}')

Total Labels: 4
Label Names: ['VT', 'VTobspy', 'Copia de VT 2', 'Copia de VT']


Calcula el total de muestras que contiene el dataset

In [127]:
filenames = tf.io.gfile.glob(str(default_dir) + '/*/*')
num_samples = len(filenames)
print('Number of total examples:', num_samples)

Number of total examples: 70


In [128]:
for l in labels:
    print(l)

VT
VTobspy
Copia de VT 2
Copia de VT


In [129]:
def preprocesamiento(Ruta, Detrend='demean', New_Sampling=0, Filter='bandpass', Fmin=0.1, Fmax=25):

    '''
    Ruta: ruta de acceso del archivo
    Detrend: remueve la tendencia lineal
             'demean': remueve promedio de la serie
             'constant': extrae tendencia lineal con una recta entre el primer y último valor
             'linear': extrae la tendencia usando una aproximación por mínimos cuadrados
    New_Sampling: Nueva frecuencia de muestreo si se desea remuestrear. Fijar en 0 si no desea remuestreo
    Filter: filtro deseado
             'bandpass': pasa banda
             'highpass': pasa alto (toma Fmin como frecuencia de corte)
             'lowpass': pasa bajo (toma Fmax como frecuencia de corte)
             'bandstop': band stop
    Fmin: frecuencia mínima para el filtro
    Fmax: frecuencia máxima para el filtro
    '''
    
    #Importar los datos
    st = read(Ruta)
    tr = st[0]
    
    #Remover tendencia lineal
    sismo = tr.detrend(Detrend)
    
    #Reducir frecuencia de muestreo
    if New_Sampling > 0 and New_Sampling < sismo.stats.sampling_rate:
        sismo = sismo.decimate(round(sismo.stats.sampling_rate / New_Sampling))
    
    #Filtro
    if Filter == 'bandpass':
        sismo.filter(Filter, freqmin=Fmin, freqmax=Fmax)
    elif Filter == 'highpass':
        sismo.filter(Filter, freq=Fmin)
    elif Filter == 'lowpass':
        sismo.filter(Filter, freq=Fmax)
    elif Filter == 'bandstop':
        sismo.filter(Filter, freqmin=Fmin, freqmax=Fmax)
    else:
        print('error')

    #Convierte los datos a formato deseado
    salida = tf.convert_to_tensor(sismo, dtype=tf.float32)

    return salida #cambiar a salida se usa get_spectrogram, sismo para plt

In [136]:
def load_audio_files(main_path: str, label:str):

    ''' 
    main_path: ruta de acceso del dataset
    label: lista de etiquetas de las clases
    '''

    dataset = []
    
    for l in labels:
        
        dataset_l = []
        path = main_path+l
        walker = sorted(str(p) for p in Path(path).glob(f'*.txt'))

        for i, file_path in enumerate(walker):
            path, filename = os.path.split(file_path)
            
            data = preprocesamiento(path+'/'+filename, Fmin=fmin, Fmax=fmax)
            
            dataset_l.append([data, l, filename])

        dataset.append(dataset_l)
        
    return dataset

In [131]:
def get_spectrogram(waveform, ns=300, sol=0.5):

    '''
    waveform = señal
    ns = número de muestras por sección
    sol = porcentaje de solapamiento
    '''
    
    frame_length = round(waveform.shape[0] / (ns-(ns-1)*sol))
    frame_step = round(sol*frame_length)
    
    #frame_length = 255
    #frame_step = 128
    # Padding for files with less than 16000 samples
    zero_padding = tf.zeros([waveform.shape[0]] - tf.shape(waveform), dtype=tf.float32)

    # Concatenate audio with padding so that all audio clips will be of the same length
    waveform = tf.cast(waveform, tf.float32)
    equal_length_waveform = tf.concat([waveform, zero_padding], 0)
    
    # Option 1: Use tfio to get the spectrogram
    #spect = tfio.audio.spectrogram(input=equal_length_waveform, nfft=frame_length, window=frame_length, stride=frame_step)
    
    # Option 2: Use tf.signal processing to get the Short-time Fourier transform (stft)
    spectro = tf.signal.stft(equal_length_waveform, frame_length=ns, frame_step=frame_step, window_fn=tf.signal.hamming_window)
    spectrogram = tf.abs(spectro)

    return spectrogram#, spect


In [132]:
def create_images(dataset, ds_dir, label_dir):
    # make directory
    for l in range(len(label_dir)):
        directory = f'{ds_dir}/train/{label_dir[l]}/ns={ns}_sol={sol}_f={fmin}-{fmax}/'

        os.makedirs(directory, mode=0o777, exist_ok=True)

        for i, data in enumerate(dataset[l]):

            waveform = data[0]
            spectrogram= get_spectrogram(waveform,ns=ns,sol=sol)
            #spectrogram, freqs, bins, im = plt.specgram(waveform , NFFT=ns, Fs=waveform.stats.sampling_rate, noverlap=round(ns*sol))
            
            #Usar para get_spectrogram
            
            if fmax > spectrogram.shape[1]:
                fc = spectrogram.shape[1]       
            else:
                fc = fmax
            
            '''            
            
            if fmax > freqs.max():
                fc = freqs.max()        
            else:
                fc = fmax
            '''
            
            log_spec = np.log(spectrogram[:,0:fc]) #Usar para get_spectrogram
            #log_spec = np.log(spectrogram[(spectrogram.shape[0]-freqs[freqs<fc].shape[0]):spectrogram.shape[0],:].T) #usar para plt
            plt.imsave(f'{directory}spec_img_{data[2][0:-4]}.png', log_spec)
            

        

In [133]:
fmin = 0.01
fmax = 35
trainset = load_audio_files(ruta, labels)



In [134]:
ns = 2**8
sol = 0.5
ruta_train =  "/Users/andavaro/Desktop/Andrès/UBA/TrabajoDeGradoCEIA/VENV_Python/Tesis_IA/"
create_images(trainset, ruta_train, labels)