In [1]:
import collections
import configparser
import numpy as np
import pandas as pd
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
import soundfile as sf
import sys
import torch
import torch.nn.functional as F

from pathlib import Path
sys.path.append(str(Path.cwd().parent))
from scipy.signal import find_peaks
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm

from src.dataset import BirdsongDataset
from src.network import AutoEncoderClassifer
from src.utils import GetSortedSpeciesCode, SegmentWithSlidingWindow

In [2]:
config = configparser.ConfigParser()
config.read(str(Path.cwd().parent.parent.joinpath('setting', 'config.ini')))
WIN_LEN = config['Window'].getint('Length')
HOP_LEN = WIN_LEN * (1 - config['Window'].getfloat('Overlap'))

torch.manual_seed(42)
if torch.cuda.is_available():
  DEVICE = torch.device(f'cuda:{config["Model"]["Classifier_Device"]}')
  torch.backends.cudnn.benchmark = True
else:
  DEVICE = torch.device('cpu')

TARGET_SPECIES = GetSortedSpeciesCode(Path.cwd().parent.parent.joinpath('setting', 'SPECIES.csv'))
THRESHOLD = config['Application']['Threshold'].split(',')

In [3]:
def createTmpCSV(audioPath:Path):
  df = pd.DataFrame(columns=['file', 'start time', 'end time'])
  source = sf.SoundFile(audioPath)
  df['start time'] = SegmentWithSlidingWindow(source.frames/source.samplerate, WIN_LEN, HOP_LEN)
  df['end time'] = df['start time'] + WIN_LEN
  df['file'] = Path('NrAudio', f'{audioPath.stem}.wav')
  df.to_csv(Path.cwd().parent.parent.joinpath('data', 'tmp', 'single-test.csv'), header=True)

In [4]:
def getProbabilityResults(weightPath:Path):
  model = AutoEncoderClassifer(len(TARGET_SPECIES)).to(DEVICE)
  model.load_state_dict(torch.load(weightPath, map_location=torch.device(DEVICE)))

  allDataloader = DataLoader(
    BirdsongDataset(Path.cwd().parent.parent.joinpath('data', 'tmp', 'single-test.csv'), False, False),
    batch_size=4, shuffle=False, num_workers=4, pin_memory=True
  )

  predicts = []
  model.eval()
  with torch.no_grad():
    for _, (inputs, _) in tqdm(enumerate(allDataloader), total=len(allDataloader)):
      inputs = inputs.to(DEVICE)
      outputs = F.sigmoid(model(inputs))
      predicts.extend(outputs.cpu().numpy())
  predicts = np.array(np.reshape(predicts, (-1, len(TARGET_SPECIES))))
  return predicts

In [7]:
def generateAutoLabelDataset(filename, allPeaks):
  if Path.cwd().parent.parent.joinpath('data', 'auto-dataset.csv').exists():
    df = pd.read_csv(Path.cwd().parent.parent.joinpath('data', 'auto-dataset.csv'), header=0)
  else:
    df = pd.DataFrame(columns=['file', 'start time', 'end time', 'label'])
  
  for i, vs in allPeaks.items():
    df = pd.concat(
      [df, pd.DataFrame({
        'file': Path('NrAudio', filename),
        'start time': np.around(i * HOP_LEN, decimals=6),
        'end time': np.around(i * HOP_LEN + WIN_LEN, decimals=6),
        'label': ','.join(vs)
      }, index=[0])], ignore_index=True
    )
  df.drop_duplicates(subset=['file', 'start time', 'end time'])
  df.to_csv(Path.cwd().parent.parent.joinpath('data', 'auto-dataset.csv'), header=True, index=False)

In [None]:
nrAudioPaths = sorted(Path.cwd().parent.parent.joinpath('data', 'NrAudio').glob('GW01*.wav'))

for nrAudioPath in tqdm(nrAudioPaths):
  ## 1. Create temporary csv file for model input
  createTmpCSV(nrAudioPath)
  ## 2. Input to the model
  weightPath = Path.cwd().parent.parent.joinpath('model', 'AEClassifer20220626.pth')    # Select model weight manually
  predicts = getProbabilityResults(weightPath)
  ## 3. Find peaks for species
  allPaeks = collections.defaultdict(list)
  for i, sp in enumerate(TARGET_SPECIES):
    peaks, _ = find_peaks(predicts[:, i], height=THRESHOLD[i])
    for p in peaks:
      allPaeks[p].append(sp)
  ## 4. Generate an concat label to auto-label dataset
  generateAutoLabelDataset(nrAudioPath.stem, allPaeks)