In [None]:
#Install dependencies
#!pip install -q tensorflow tensorflow_hub tensorflow_io kaggle

#Importing necessary modules
from google.colab import files
import zipfile
import pandas as pd
import os
from google.colab import files

#Preprocessing modules:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pickle
import numpy as np
from sklearn.preprocessing import LabelEncoder

#Model modules:
import librosa
import tensorflow as tf
import tensorflow_hub as hub
import soundfile as sf
import tensorflow as tf
import tensorflow_hub as hub

#Evaulation metrics:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import warnings
from sklearn.preprocessing import LabelEncoder

warnings.filterwarnings("ignore")


In [None]:
# Generating the labelled csv file since the dataset does not come with one:


wav_dir = "/kaggle/input/insectsound1000/InsectSound1000"


species_to_category = {
    "Palomena_prasina": "pest",
    "Nezara_viridula": "pest",
    "Halyomorpha_halys": "pest",
    "Rhaphigaster_nebulos": "pest",
    "Bradysia_difformis": "pest",
    "Myzus_persicae": "pest",
    "Trialeurodes_vaporariorum": "pest",
    "Tuta_absoluta": "pest",

    "Bombus_terrestris": "beneficial",
    "Episyrphus_balteatus": "beneficial",
    "Coccinella_septempunctata": "beneficial",
    "Aphidoletes_aphidimyza": "beneficial"
}

data = []

for filename in os.listdir(wav_dir):
    if filename.endswith(".wav"):
        parts = filename.split("_")

        if len(parts) >= 3:
            species = parts[1] + "_" + parts[2]

            if species in species_to_category:
                category = species_to_category[species]

                full_path = os.path.join(wav_dir, filename)

                data.append({
                    "filepath": full_path,
                    "species": species,
                    "category": category
                })

df = pd.DataFrame(data)

csv_path = "/kaggle/working/insect_labels_binary.csv"
df.to_csv(csv_path, index=False)

print("CSV saved at:", csv_path)
print("Total samples:", len(df))
print("\nCategory distribution:\n")

#Converting "category" column to binary values for easy processing

df["category"] = df["category"].map({
    "pest": 0,
    "beneficial": 1
})

#Displaying the category value spread
print(df["category"].value_counts())




In [None]:
# Label file
metadata = pd.read_csv("/kaggle/working/insect_labels_binary.csv")
metadata.head()

In [None]:
#Directory verification
fp = "/kaggle/input/insectsound1000/InsectSound1000"
audio = os.listdir(fp)
print(f"Total files: ",len(audio))
print("Sample files: ", audio[:5])

In [None]:
print(df.head())

In [None]:
# Downsampling the pest data to minimize data imbalance

# Count samples per class
print("Before downsampling:")
print(df["category"].value_counts())

# Separate pest and beneficial
pest_df = df[df["category"] == 0]          # pests
beneficial_df = df[df["category"] == 1]    # beneficials

# Downsample pest to match beneficial count
pest_downsampled = pest_df.sample(
    n=len(beneficial_df), 
    random_state=42
)

# Combine back into a balanced dataset
balanced_df = pd.concat([pest_downsampled, beneficial_df])

# Shuffle rows
balanced_df = balanced_df.sample(frac=1, random_state=42).reset_index(drop=True)

print("After downsampling:")
print(balanced_df["category"].value_counts())

In [None]:
# Extracting labels and filepaths

X = []
y = []

species_to_category = {
    "Palomena_prasina": 0,
    "Nezara_viridula": 0,
    "Halyomorpha_halys": 0,
    "Rhaphigaster_nebulos": 0,
    "Bradysia_difformis": 0,
    "Myzus_persicae": 0,
    "Trialeurodes_vaporariorum": 0,
    "Tuta_absoluta": 0,

    "Bombus_terrestris": 1,
    "Episyrphus_balteatus": 1,
    "Coccinella_septempunctata": 1,
    "Aphidoletes_aphidimyza": 1
}


X = balanced_df['filepath'].values;
y = balanced_df['category'].values;

print("Total files:", len(X))
print("Example:", X[0], "→", y[0])


In [None]:
# Verification of sampling rate to make sure it says as 16k


file_path = X[0]  # first audio file
_, sr = librosa.load(file_path, sr=None)

print("Sampling rate:", sr)


In [None]:
# Encoding labels for compatability with model


le = LabelEncoder()
y_encoded = le.fit_transform(y)

num_classes = len(le.classes_)

print("Number of classes:", num_classes)
print("Encoded example:", y_encoded[:5])
print("Class mapping:")
for i, cls in enumerate(le.classes_):
    print(i, "→", cls)


In [None]:
#Initializing to NumPy arrays for better performance
X = np.array(X)
y_encoded = np.array(y_encoded)
print(X[:5],"\n",y_encoded[:5])

In [None]:
# Function to load audio waveform from each file 



def load_mono_py(file_path):
    if isinstance(file_path, tf.Tensor):
        file_path = file_path.numpy()
    if isinstance(file_path, (bytes, bytearray)):
        file_path = file_path.decode("utf-8")
    if isinstance(file_path, np.ndarray) and np.issubdtype(file_path.dtype, np.floating):
        return file_path.astype(np.float32)
    if isinstance(file_path, str):
        data, sr = sf.read(file_path)
        if len(data.shape) > 1:
            data = np.mean(data, axis=1)
        if sr != 16000:
            data = librosa.resample(data, orig_sr=sr, target_sr=16000)
        return data.astype(np.float32)
    raise ValueError(f"Unsupported type: {type(file_path)}")

def load_mono_tf(file_path):
    waveform = tf.py_function(
        load_mono_py,
        [file_path],
        tf.float32
    )
    waveform.set_shape([None])  
    return waveform


yamnet = hub.load('https://tfhub.dev/google/yamnet/1')

# Program to extract the relevant embeddings from the waveforms in the previous fn


def extembed(file_path, label):
    waveform = load_mono_tf(file_path)
    
    scores, embeddings, spectrogram = yamnet(waveform)
    mean = tf.reduce_mean(embeddings, axis=0)
    std  = tf.math.reduce_std(embeddings, axis=0)
    embedding = tf.concat([mean, std], axis=0)

    # # Average temporal embeddings
    # mean = tf.reduce_mean(embeddings, axis=0) 
    # #emean = tf.reduce_max(embeddings, axis=0) #In case mean does not work
    # std = tf.math.reduce_std(embeddings, axis=0)
    # embedding = tf.concat([mean, std], axis=0)

    return embedding, label


In [None]:
num = len(set(y_encoded))

AUTOTUNE = tf.data.AUTOTUNE

dataset = tf.data.Dataset.from_tensor_slices((X, y_encoded)) # Converts arrays into TensorFlow datasets

dataset = (
    dataset
    .shuffle(10000)
    .map(extembed, num_parallel_calls=AUTOTUNE)
    .cache()
    .batch(512)
    .prefetch(AUTOTUNE)
)


In [None]:
# Classification phase: Dense NN
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

#Main dataset split
X_train, X_val, y_train, y_val = train_test_split(
    X,
    y_encoded,
    test_size=0.2,
    stratify=y_encoded,
    random_state=42
)

#Training and validation datasets:
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val))

AUTOTUNE = tf.data.AUTOTUNE

train_ds = (
    train_ds
    .shuffle(10000)
    .map(extembed, num_parallel_calls=AUTOTUNE)
    .batch(512)
    .prefetch(AUTOTUNE)
)

val_ds = (
    val_ds
    .map(extembed, num_parallel_calls=AUTOTUNE)
    .batch(512)
    .prefetch(AUTOTUNE)
)


In [None]:
# CNN Architecture
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(2048,)),   # matches pooled embedding
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

In [None]:
for x_batch, y_batch in train_ds.take(1):
    print(x_batch.shape, y_batch.shape)

In [None]:
model.summary()

In [None]:
# Code to handle class imbalance
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

classes = np.unique(y_train)

class_weights = compute_class_weight(
    class_weight="balanced",
    classes=classes,
    y=y_train
)

class_weight_dict = dict(zip(classes, class_weights))
count = 0;
for k,v in class_weight_dict.items():
    print("Key: ",k,"\n","Value: ",v)
    count += 1;
    if count > 5: break;

In [None]:
print(y_train[:5])

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

model.summary()


In [None]:
#Adding callbacks - Why?
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=3,
        restore_best_weights=True
    ),
    tf.keras.callbacks.ModelCheckpoint(
        "best_model.h5",
        save_best_only=True
    )
]


In [None]:
# import soundfile as sf

# for f in X[:1000]:
#     try:
#         info = sf.info(f)
#         print(f"{f}: channels={info.channels}, subtype={info.subtype}")
#     except Exception as e:
#         print(f"Corrupt or unreadable: {f} ({e})")


In [None]:
 #Model Training
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,
    class_weight=class_weight_dict,
    callbacks=callbacks
)





In [None]:
#Model evaluation
val_loss, val_acc = model.evaluate(val_ds)
print("Validation Accuracy:", val_acc)


In [None]:
# Classification report

y_true = []
y_pred = []

for embeddings, labels in val_ds:
    preds = model.predict(embeddings)
    y_true.extend(labels.numpy())
    y_pred.extend((preds > 0.5).astype("int32").flatten())

print(classification_report(y_true, y_pred))


In [None]:
# Confusion matrix
cm = confusion_matrix(y_val, y_pred)
cls = le.classes_
labels = np.unique(np.concatenate((y_val, y_pred)))
comb = [cls[i] for i in labels]

plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=comb, yticklabels=comb)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()


# <u>Re-Training Model with Noisy Data:</u> 

In [None]:

# Function to add noise to dataset
def add_noise(waveform, noise_factor=0.005):
    noise = tf.random.normal(shape=tf.shape(waveform), mean=0.0, stddev=1.0)
    return waveform + noise_factor * noise

In [None]:
# Function to add noise while extracting embeddings
def extembed_noisy(file_path, label):
    waveform = load_mono_tf(file_path)
    waveform_noisy = add_noise(waveform)  # inject noise
    scores, embeddings, spectrogram = yamnet(waveform_noisy)


    mean = tf.reduce_mean(embeddings, axis=0)
    std  = tf.math.reduce_std(embeddings, axis=0)
    embedding = tf.concat([mean, std], axis=0)
    return embedding, label


noisy_val_ds = (tf.data.Dataset.from_tensor_slices((X_val, y_val))
    .map(extembed_noisy, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(512)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
# CNN Architecture
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(2048,)),  
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

In [None]:
# Model compilation
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

model.summary()

In [None]:
 #Model Training
history = model.fit(
    train_ds,
    validation_data=noisy_val_ds,
    epochs=5,
    class_weight=class_weight_dict,
    callbacks=callbacks
)


In [None]:
# Model Evaluation
loss, acc = model.evaluate(noisy_val_ds)
print(f"Noisy test set accuracy: {acc:.4f}")

In [None]:
# Confusion matrix of the noisy data


y_true = []
y_pred = []

for x_batch, y_batch in noisy_val_ds:
    preds = model.predict(x_batch)
    preds = (preds > 0.5).astype("int32")
    y_true.extend(y_batch.numpy())
    y_pred.extend(preds.flatten())

print(confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred))

# <u>Re-Training model after using noise reduction techniques</u> 

 <u> Noise reduction technique:</u> Band-Pass filtering<br>

 Band-Pass filtering is a method of noise filtering where only there relevant frequency range is kept and the extra frequencies (noise) are normalized/attenuated.<br>
 Helps in removing unwanted hisses, static, bass, hum, etc that happens when operating under real world conditions.

In [None]:


# Function to implement band-pass filtering
def bandpass_filter(waveform, sr=16000, low=200, high=8000):
    stft = librosa.stft(waveform.numpy()) # Short term Fourier transforms
    freqs = librosa.fft_frequencies(sr=sr, n_fft=stft.shape[0]*2-1)

    # Mask frequencies outside desired band
    mask = (freqs >= low) & (freqs <= high)
    stft_filtered = stft[mask, :]

    # Inverse transform back to waveform
    filtered = librosa.istft(stft_filtered)
    return tf.convert_to_tensor(filtered, dtype=tf.float32)

# Function to extract embeddings from the model after applying bnd pass filtering
def extembed_filtered(file_path, label):
    waveform = load_mono_tf(file_path)
    waveform_filtered = tf.py_function(
        func=lambda x: bandpass_filter(x, sr=16000),
        inp=[waveform],
        Tout=tf.float32
    )
    waveform_filtered.set_shape([None])

    # Pass filtered waveform into YAMNet
    scores, embeddings, spectrogram = yamnet(waveform_filtered)

    # Pool embeddings into fixed vector
    mean = tf.reduce_mean(embeddings, axis=0)
    std  = tf.math.reduce_std(embeddings, axis=0)
    embedding = tf.concat([mean, std], axis=0)

    return embedding, label

# Build noisy+filtered validation dataset
val_ds_filtered = (tf.data.Dataset.from_tensor_slices((X_val, y_val))
    .map(extembed_filtered, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(512)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
# CNN Architecture
model3 = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(2048,)),   
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

In [None]:
# Model compilation
model3.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

model3.summary()

In [None]:
 #Model Training
history = model3.fit(
    train_ds,
    validation_data=val_ds_filtered,
    epochs=5,
    class_weight=class_weight_dict,
    callbacks=callbacks
)


In [None]:
# Model Evaluation
loss, acc = model.evaluate(val_ds_filtered)
print(f"Filtered noisy validation accuracy: {acc:.4f}")

In [None]:
#Confusion matrix
# Confusion matrix of the noisy data


y_true = []
y_pred = []

for x_batch, y_batch in val_ds_filtered:
    preds = model.predict(x_batch)
    preds = (preds > 0.5).astype("int32")
    y_true.extend(y_batch.numpy())
    y_pred.extend(preds.flatten())

print(confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred))