In [None]:
input_root="../szunyog_hangok_25_01_14"
output_root="../szunyog_hangok_25_01_14_preprocessed_database_simple_classifier"

model_path = "c:/PROJECTS/szunyog/b_mosquito/b_mosquito_simple_classifier-1_checkpoints/best_loss_99.pth"
b_selfattention=True
classification_threshold=0.9
b_write_csv=True
b_save_segments=True


In [None]:
import os
from pathlib import Path
from pydub import AudioSegment
import os
import numpy as np
import librosa
import glob
import pandas as pd
import numpy as np

import torch
import torch.nn.functional as F

import dataPreprocess
import model_modified

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import time

# Támogatott hangfájl formátumok
supported_formats = [".wav"]

# Az output mappa létrehozása, ha nem létezik
os.makedirs(output_root, exist_ok=True)



In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device="cpu"
print(device)


In [None]:

# Load pretrained model and processor
label2id = {'not':0, 'mosquito':1}
id2label = {v: k for k, v in label2id.items()}

# Model betöltése
resnet_model = model_modified.resnet18_attention(1, len(label2id), b_selfattention=b_selfattention)
resnet_model = resnet_model.to(device)

resnet_model.load_state_dict(torch.load(model_path))
resnet_model.eval()

resnet_model


In [None]:
def split_audio_chunks(input_file):
    """
    Hangfájl beolvasása, csatornák számának meghatározása, darabolás 1 másodperces darabokra, 0.5 másodperces átfedéssel.
    """
    chunks = []
    metadata = []
    try:
        # Hangfájl betöltése
        audio = AudioSegment.from_file(input_file)

        # Csatornák számának meghatározása
        num_channels = audio.channels
        print(f"Fájl: {input_file}, Csatornák száma: {num_channels}")
        

        # Minden csatorna szétválasztása
        separated_channels = audio.split_to_mono()

        for channel_index, channel_audio in enumerate(separated_channels):

            # Konvertálás 8kHz-re
            #channel_audio = channel_audio.set_frame_rate(8000)
            channel_audio = channel_audio.set_frame_rate(16000)

            duration_ms = len(channel_audio)

            # x másodperces ablak, y másodperces lépés
            window_size = 1000  # milliszekundumban
            step_size = 500     # milliszekundumban

            for start_ms in range(0, duration_ms - window_size + 1, step_size):
                end_ms = start_ms + window_size
                chunk = channel_audio[start_ms:end_ms]
                chunks.append(chunk)

                # Metadata rögzítése
                metadata.append({
                    "file": os.path.basename(input_file),
                    "channel": channel_index + 1,
                    "start_ms": start_ms,
                    "end_ms": end_ms
                })
            #break # channel
            
    except Exception as e:
        print(f"Hiba a darabolás során: {input_file} - {e}")

    return chunks, metadata


In [None]:


def getFeature(y, sr=8000, top_db=80):
    """
    Funkció a spektrális jellemzők kiszámítására közvetlenül az audio adatból.
    """
    # Audio padding (2 másodperces minimum hossz)
    y = np.pad(y, int(np.ceil((2 * sr - y.shape[0]) / 2)), mode='reflect')
    
    # Melspectrogram kiszámítása
    spec = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=256, hop_length=64)
    spec = librosa.power_to_db(spec, top_db=top_db)

    # Normalizálás 0-255 közé
    spec_min, spec_max = spec.min(), spec.max()
    spec = 255 * (spec - spec_min) / (spec_max - spec_min)
    spec = spec.astype(np.uint8)
    spec = spec[np.newaxis, ...]  # Csatorna dimenzió hozzáadása
    return spec


In [None]:
import torchaudio.transforms as T
resample_transform = None

def torch_resample(y, orig_sr, target_sr):
    """
    Resampling torchaudio segítségével.
    """
    global resample_transform
    if resample_transform is None:
        resample_transform = T.Resample(orig_freq=orig_sr, new_freq=target_sr)
    
    return resample_transform(torch.tensor(y)).numpy()

from scipy.signal import resample

def fast_resample(y, orig_sr, target_sr):
    """
    Resampling scipy.signal.resample használatával.
    """
    num_samples = int(len(y) * target_sr / orig_sr)
    return resample(y, num_samples)
    

In [None]:
#!pip install samplerate
import samplerate

def samplerate_resample(y, orig_sr, target_sr):
    """
    Resampling samplerate használatával.
    """
    ratio = target_sr / orig_sr
    #return samplerate.resample(y, ratio, 'sinc_fastest')
    return samplerate.resample(y, ratio, 'sinc_medium')
    

In [None]:

def predict_audio_chunk(chunk, model, target_sr=8000, threshold=0.5):
    """
    Szűrőfüggvény, amely az adott hangdarabról eldönti, hogy tartalmaz-e szúnyoghangot.
    """

    # Audio adatok kinyerése a chunkból
    data = chunk.get_array_of_samples()
    dtype = data.typecode  # Az array típusa, pl. 'h' vagy 'i'
    y = np.array(data, dtype=np.float32)  # Mindig float32-be alakítjuk

    # Normalizálás az adat típusától függően
    if dtype == 'h':  # 16 bites integer
        y = y / (2**15)  # Normálás -1 és 1 közé
    elif dtype == 'i':  # 32 bites integer
        y = y / (2**31)  # Normálás -1 és 1 közé
    else:
        raise ValueError(f"Nem támogatott adatformátum: {dtype}")

    sr = chunk.frame_rate

    # Downsampling, ha szükséges
    if sr != target_sr:
        #y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
        #y = torch_resample(y, sr, target_sr)
        #y = fast_resample(y, sr, target_sr)
        y = samplerate_resample(y, sr, target_sr)
        sr = target_sr

    # Feature extraction
    spec = getFeature(y, sr=sr)


    # Tenzor előkészítése PyTorch modellhez
    inputs = torch.tensor(spec, dtype=torch.float32).unsqueeze(0)  # Batch dimenzió hozzáadása
    inputs = inputs.to(device)
    
    # Modell predikció
    with torch.no_grad():
        logits = model(inputs)

    # Valószínűségek kiszámítása softmax-szal
    probabilities = torch.softmax(logits, dim=-1)
    class_1_probabilities = probabilities[:, 1]

    # Szűrés küszöbérték alapján
    thresholded = class_1_probabilities >= threshold

    return thresholded[0]



In [None]:
def anal_chunks(chunks, output_dir, base_filename, metadata, model, classification_threshold=0.5, b_save=False):
    """
    Darabok mentése a megadott mappába sorszámozva.
    """
    os.makedirs(output_dir, exist_ok=True)
    not_selected_dir = os.path.join(os.path.dirname(output_dir), os.path.basename(output_dir) + "_not_selected")
    os.makedirs(not_selected_dir, exist_ok=True)

    #speech_dir = os.path.join(os.path.dirname(output_dir), os.path.basename(output_dir) + "_speech")
    #os.makedirs(speech_dir, exist_ok=True)

    sound_idxs=[]

    for idx, chunk in enumerate(chunks):
        md=metadata[idx]

        ch=md['channel']
        start=str(int(md['start_ms']))
        
        #if filter_speech(chunk):
        #    if b_save:
        #        chunk_speech_file = os.path.join(speech_dir, f"{base_filename}_{ch}_{start}.wav")            
                #chunk.export(chunk_speech_file, format="wav", parameters=["-ar", "16000", "-ac", "1", "-sample_fmt", "s16"])

        #    if idx % 50 == 0:
        #        pass
                #print(f"{idx}: Mentett darab (BESZÉD): {chunk_speech_file}")
        #    continue
        
        if predict_audio_chunk(chunk, model, threshold=classification_threshold):
            sound_idxs.append(idx)
            chunk_output_file = os.path.join(output_dir, f"{base_filename}_{ch}_{start}.wav")
            #print(f"{idx}: Mentett darab (SZÚNYOG HANG): {chunk_output_file}")

            if b_save:
                chunk.export(chunk_output_file, format="wav", parameters=["-ar", "16000", "-ac", "1", "-sample_fmt", "s16"])

                if idx % 50 == 0:
                    pass
                    print(f"{idx}: Mentett darab (SZÚNYOG HANG): {chunk_output_file}")
        else:
            if b_save:
                not_selected_file = os.path.join(not_selected_dir, f"{base_filename}_{ch}_{start}.wav")
                #chunk.export(not_selected_file, format="wav", parameters=["-ar", "16000", "-ac", "1", "-sample_fmt", "s16"])

            if idx % 50 == 0:
                pass
                #print(f"{idx}: Nem kiválasztott darab: {not_selected_file}")

        #break
        
    return sound_idxs


In [None]:

fns=glob.glob(os.path.join(input_root,"*.wav"))

model=resnet_model

# Időmérés kezdete
start_time = time.time()

for idx, input_fn in enumerate(fns):

    base_filename=os.path.basename(input_fn)
    output_file = os.path.join(output_root, f"{base_filename[:-4]}.csv")

    
    #Ha az output fájl már létezik, folytatjuk a következő fájllal
    if os.path.exists(output_file):
        print(f"allready done: {input_fn}")
        continue

    # Darabolás és szűrés
    #print("split begun")
    chunks, metadata = split_audio_chunks(input_fn)
    #print("split finished")


    if len(chunks)>0:
        # indexben a szunyog szegmensek, es a fuggveny wav-okat is kiir b_save=1 esetben
        sound_idxs=anal_chunks(chunks, output_root, base_filename, metadata, model, classification_threshold=classification_threshold, b_save=b_save_segments)

        if b_write_csv:
            if len(sound_idxs)>0:
                # Metadata DataFrame létrehozása
                out_df = pd.DataFrame(metadata)
                
                # Szűrés az analizált indexek alapján
                out_df = out_df.iloc[sound_idxs]
            
                # Eredmények mentése fájlba
                out_df.to_csv(output_file, index=False)
            else:
                # Üres csv
                with open(output_file, mode="w") as file:
                    file.write("file,channel,start_ms,end_ms\n")
                print("nem találtam szúnyoghangot.")

end_time = time.time()
print(f"befejezve, idő: {end_time - start_time:.6f} másodperc")
