In [None]:
import os
from matplotlib import pyplot as plt
import tensorflow as tf
# import tensorflow_io as tfio

In [None]:
CAPUCHIN_FILE = os.path.join('data','Parsed_Capuchinbird_Clips','XC3776-3.wav')
NOT_CAPUCHIN_FILE = os.path.join('data','Parsed_Not_Capuchinbird_Clips','afternoon-birds-song-in-forest-0.wav')

In [None]:
CAPUCHIN_FILE

In [None]:
import librosa
import numpy as np
def load_wav_16k_mono(filename):
    import numpy as np
    import librosa

    # Safely convert the input to string
    if isinstance(filename, np.ndarray):
        # If it's an array of bytes like [b'd:/path/to/file.wav']
        filename = filename.tolist()
        if isinstance(filename, list) and isinstance(filename[0], bytes):
            filename = filename[0].decode("utf-8")
        elif isinstance(filename, list):
            filename = ''.join([char.decode("utf-8") if isinstance(char, bytes) else char for char in filename])
    elif isinstance(filename, bytes):
        filename = filename.decode("utf-8")

    print("Loading file:", filename)  # Debugging line

    wav, sr = librosa.load(filename, sr=16000, mono=True)
    return np.array(wav, dtype=np.float32)



In [None]:
# Load waveforms
wave = load_wav_16k_mono(CAPUCHIN_FILE)
nwave = load_wav_16k_mono(NOT_CAPUCHIN_FILE)

# Plot both waveforms
plt.figure(figsize=(15, 4))

plt.subplot(1, 2, 1)
plt.plot(wave)
plt.title("Capuchinbird Call")
plt.xlabel("Samples")
plt.ylabel("Amplitude")

plt.subplot(1, 2, 2)
plt.plot(nwave)
plt.title("Not Capuchinbird Call")
plt.xlabel("Samples")
plt.ylabel("Amplitude")

plt.tight_layout()
plt.show()


In [None]:

import glob

# 3.1 Define paths
POS = os.path.join('data', 'Parsed_Capuchinbird_Clips')
NEG = os.path.join('data', 'Parsed_Not_Capuchinbird_Clips')

# Count files manually since `len(tf.data.Dataset)` doesn't work
pos_files = glob.glob(POS + '/*.wav')
neg_files = glob.glob(NEG + '/*.wav')

# 3.2 Create TensorFlow Datasets of file paths
pos_ds = tf.data.Dataset.from_tensor_slices(pos_files)
neg_ds = tf.data.Dataset.from_tensor_slices(neg_files)

# 3.3 Add labels and combine datasets
positives = pos_ds.map(lambda x: (x, tf.constant(1.0)))  # Label 1 for capuchin
negatives = neg_ds.map(lambda x: (x, tf.constant(0.0)))  # Label 0 for not capuchin

data = positives.concatenate(negatives)




In [None]:
data.shuffle(buffer_size=1000).as_numpy_iterator().next()

In [None]:
lengths = []
for file in os.listdir(os.path.join('data', 'Parsed_Capuchinbird_Clips')):
    tensor_wave = load_wav_16k_mono(os.path.join('data', 'Parsed_Capuchinbird_Clips', file))
    lengths.append(len(tensor_wave))

In [None]:
tf.math.reduce_mean(lengths)

In [None]:
tf.math.reduce_min(lengths)

In [None]:
tf.math.reduce_max(lengths)

In [None]:
# def preprocess(file_path, label): 
#     wav = load_wav_16k_mono(file_path)
#     wav = wav[:48000]
#     zero_padding = tf.zeros([48000] - tf.shape(wav), dtype=tf.float32)
#     wav = tf.concat([wav, zero_padding], axis=0)

#     # Convert to spectrogram
#     spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
#     spectrogram = tf.abs(spectrogram)

#     # Add channel dimension
#     spectrogram = tf.expand_dims(spectrogram, axis=2)

#     return spectrogram, label
def preprocess(file_path, label):
    wav = tf.numpy_function(load_wav_16k_mono, [file_path], tf.float32)
    wav.set_shape([None])

    # Pad or trim to 48000 samples
    wav = wav[:48000]
    paddings = tf.maximum(48000 - tf.shape(wav)[0], 0)
    wav = tf.pad(wav, paddings=[[0, paddings]])

    # Compute spectrogram
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)

    # Optional: explicitly set shape to help model input sanity check
    spectrogram.set_shape([1491, 257, 1])

    label = tf.cast(label, tf.float32)
    return spectrogram, label



In [None]:
# Get a sample file from the positives dataset
file_path, label = positives.shuffle(10000).as_numpy_iterator().next()

# Convert byte string path to string if needed
file_path = file_path.decode("utf-8")

# Apply preprocessing
spectrogram, label = preprocess(file_path, label)

In [None]:
plt.figure(figsize=(30,20))
plt.imshow(tf.transpose(spectrogram)[0])
plt.title(f"Label: {label}")
plt.xlabel("Time")
plt.ylabel("Frequency bins")
plt.show()

In [None]:
# data = data.map(preprocess)                  # Apply spectrogram preprocessing
# data = data.cache()                          # Cache in memory for performance
# data = data.shuffle(buffer_size=1000)        # Shuffle to randomize order
# data = data.batch(16)                        # Batch the data (size = 16)
# data = data.prefetch(8)                      # Prefetch 8 batches to improve pipeline speed
data = data.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
data = data.cache()
data = data.shuffle(1000)
data = data.batch(4)
data = data.prefetch(8)

In [None]:
train = data.take(36)
test = data.skip(36).take(15)

In [None]:
samples, labels = train.as_numpy_iterator().next()

In [None]:
samples.shape

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten,MaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping
early_stop = EarlyStopping(
    monitor='val_loss',       # Watch validation loss
    patience=3,               # Wait 3 epochs without improvement
    restore_best_weights=True  # Restore weights from the best epoch
)


In [None]:
model = Sequential()
model.add(Conv2D(16, (3, 3), activation='relu', input_shape=(1491, 257, 1)))
model.add(MaxPooling2D((4, 4)))  # ✅ Reduce spatial size
model.add(Conv2D(16, (3, 3), activation='relu'))
  # ✅ Further reduction
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(1, activation='sigmoid'))


In [None]:
model.compile("Adam",loss="BinaryCrossentropy",metrics=[tf.keras.metrics.Recall(),tf.keras.metrics.Precision()])

In [None]:
model.summary()

In [None]:
hist = model.fit(train,epochs=10,validation_data=test,verbose=1,callbacks=[early_stop])

In [None]:

print(hist.history.keys())
# Plot Loss
plt.title('Loss')
plt.plot(hist.history['loss'], 'r', label='Train Loss')
plt.plot(hist.history['val_loss'], 'b', label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot Precision
plt.title('Precision')
plt.plot(hist.history['precision'], 'r', label='Train Precision')
plt.plot(hist.history['val_precision'], 'b', label='Validation Precision')
plt.xlabel('Epochs')
plt.ylabel('Precision')
plt.legend()
plt.show()

# Plot Recall
plt.title('Recall')
plt.plot(hist.history['recall'], 'r', label='Train Recall')
plt.plot(hist.history['val_recall'], 'b', label='Validation Recall')
plt.xlabel('Epochs')
plt.ylabel('Recall')
plt.legend()
plt.show()


In [None]:
X_test, y_test = test.as_numpy_iterator().next()
yhat = model.predict(X_test)

In [None]:
yhat = [1 if prediction > 0.5 else 0 for prediction in yhat]

In [None]:
yhat


In [None]:
# def load_mp3_16k_mono(filename):
#     """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
#     res = tfio.audio.AudioIOTensor(filename)
#     # Convert to tensor and combine channels 
#     tensor = res.to_tensor()
#     tensor = tf.math.reduce_sum(tensor, axis=1) / 2 
#     # Extract sample rate and cast
#     sample_rate = res.rate
#     sample_rate = tf.cast(sample_rate, dtype=tf.int64)
#     # Resample to 16 kHz
#     wav = tfio.audio.resample(tensor, rate_in=sample_rate, rate_out=16000)
#     return wav
# mp3 = os.path.join('data', 'Forest Recordings', 'recording_00.mp3')
# wav = load_mp3_16k_mono(mp3)
# audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=48000, sequence_stride=48000, batch_size=1)
# samples, index = audio_slices.as_numpy_iterator().next()

In [None]:
import librosa
import numpy as np

def load_mp3_librosa(filepath, sr=16000):
    # Load MP3 using librosa, convert to mono, resample to 16kHz
    wav, _ = librosa.load(filepath, sr=sr, mono=True)
    return wav

def create_windows(audio, window_size=48000, stride=48000):
    windows = []
    for i in range(0, len(audio) - window_size + 1, stride):
        window = audio[i:i+window_size]
        windows.append(window)
    return np.array(windows)

def compute_spectrogram_librosa(audio_window, n_fft=320, hop_length=32):
    # STFT returns complex values
    stft = librosa.stft(audio_window, n_fft=n_fft, hop_length=hop_length)
    spectrogram = np.abs(stft)
    # Add channel dimension to match shape (time, freq, 1)
    spectrogram = np.expand_dims(spectrogram.T, axis=-1)  # shape: (time, freq_bins, 1)
    return spectrogram


In [None]:
def preprocess_mp3_librosa(filepath):
    wav = load_mp3_librosa(filepath)
    windows = create_windows(wav, window_size=48000, stride=48000)
    
    spectrograms = []
    for window in windows:
        spec = compute_spectrogram_librosa(window)
        spectrograms.append(spec)
    
    return np.array(spectrograms)


In [None]:
path = 'data/Forest Recordings/recording_00.mp3'
spectrograms = preprocess_mp3_librosa(path)
print(spectrograms.shape)  # should be like (4, 1491, 257, 1)


In [None]:
def slice_to_spectrogram(wav_slices, n_fft=320, hop_length=32):
    spectrograms = []
    for slice in wav_slices:
        stft = librosa.stft(slice, n_fft=n_fft, hop_length=hop_length)
        spect = np.abs(stft)
        spect = np.expand_dims(spect.T, axis=-1)  # shape: (time, freq, 1)
        spectrograms.append(spect)
    return np.array(spectrograms)


In [None]:
def preprocess_mp3(sample, label):
    sample = sample[0]  # shape: [16000]
    zero_padding = tf.zeros([48000] - tf.shape(sample), dtype=tf.float32)
    wav = tf.concat([sample, zero_padding], axis=0)
    
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    
    return spectrogram


In [None]:
audio_slices = (
    tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=16000, sequence_stride=16000, batch_size=1)
    .map(preprocess_mp3)
    .batch(64)
)


In [None]:
yhat = model.predict(audio_slices)
yhat = [1 if prediction > 0.5 else 0 for prediction in yhat]

In [None]:
from itertools import groupby
yhat = [key for key, group in groupby(yhat)]
calls = tf.math.reduce_sum(yhat).numpy()
calls

In [None]:
results = {}

for file in os.listdir(os.path.join('data', 'Forest Recordings')):
    FILEPATH = os.path.join('data', 'Forest Recordings', file)
    
    wav = load_mp3_librosa(FILEPATH)
    
    audio_slices = tf.keras.utils.timeseries_dataset_from_array(
        wav, wav,
        sequence_length=48000,
        sequence_stride=48000,
        batch_size=1
    )
    
    audio_slices = audio_slices.map(preprocess_mp3)
    audio_slices = audio_slices.batch(64)
    
    yhat = model.predict(audio_slices)
    results[file] = yhat

In [None]:
results

In [None]:
class_preds = {}
for file, logits in results.items():
    class_preds[file] = [1 if prediction > 0.99 else 0 for prediction in logits]
class_preds

In [None]:
postprocessed = {}
for file, scores in class_preds.items():
    postprocessed[file] = tf.math.reduce_sum([key for key, group in groupby(scores)]).numpy()
postprocessed

In [None]:

from itertools import groupby

def load_mp3_16k_mono_librosa(filename):
    wav, _ = librosa.load(filename, sr=16000, mono=True)
    return tf.convert_to_tensor(wav, dtype=tf.float32)

def preprocess_mp3(sample, index=None):
    sample = sample[0]
    zero_padding = tf.zeros([48000] - tf.shape(sample), dtype=tf.float32)
    wav = tf.concat([zero_padding, sample], 0)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    return spectrogram

def classify_audio_file(filepath, model, threshold=0.99):
    # Step 1: Load MP3
    wav = load_mp3_16k_mono_librosa(filepath)

    # Step 2: Slice into 3s chunks
    audio_slices = tf.keras.utils.timeseries_dataset_from_array(
        wav, wav,
        sequence_length=48000,
        sequence_stride=48000,
        batch_size=1
    )

    # Step 3: Preprocess and batch
    audio_slices = audio_slices.map(preprocess_mp3)
    audio_slices = audio_slices.batch(64)

    # Step 4: Predict
    predictions = model.predict(audio_slices)

    # Step 5: Convert to class labels
    binary_preds = [1 if pred > threshold else 0 for pred in predictions]

    # Step 6: Count distinct detection groups
    detection_count = tf.math.reduce_sum([k for k, _ in groupby(binary_preds)]).numpy()

    return {
        "file": os.path.basename(filepath),
        "predictions": binary_preds,
        "detection_count": detection_count
    }


In [None]:
user_file = 'data/Forest Recordings/recording_08.mp3'
output = classify_audio_file(user_file, model)
print(output)
