In [8]:
# -*- coding: utf-8 -*-

# don
import pandas as pd
import numpy as np
import librosa
import os
import zipfile
import csv
import shutil
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from concurrent.futures import ThreadPoolExecutor, as_completed
from sklearn.metrics import (
    roc_auc_score,        
)


In [9]:
"""
Script for extracting data from https://figshare.com/articles/dataset/Sounds_of_the_Eleutherodactylus_frog_community_from_Puerto_Rico/806302?file=3104183
Unzips all the zips from root
Simply Unzip downloaded file and provide path root of folder
"""


def extract_zip_file(file_path, extract_to):
    """
    Extracts a single zip file to a specified directory.

    Args:
        file_path (str): Path to the zip file.
        extract_to (str): Path to the directory where the zip file will be extracted.
    """
    with zipfile.ZipFile(file_path, "r") as zip_ref:
        zip_ref.extractall(extract_to)
        # print(f"{os.path.basename(file_path)} extracted to {os.path.abspath(extract_to)}")


def extract_zip_files(zip_folder, extract_to):
    """
    Extracts all zip files from a folder to a specified directory using threading.

    Args:
        zip_folder (str): Path to the folder containing zip files.
        extract_to (str): Path to the directory where zip files will be extracted.
    """
    # Make sure the extraction directory exists
    os.makedirs(extract_to, exist_ok=True)

    data_file = "FrequencyRange_by_species_and_site_Averages.csv"
    shutil.copyfile(os.path.join(zip_folder, data_file) , os.path.join(extract_to, data_file))

    # List all zip files in the folder
    zip_files = [
        os.path.join(zip_folder, item)
        for item in os.listdir(zip_folder)
        if item.endswith(".zip")
    ]

    # Use ThreadPoolExecutor to extract zip files concurrently
    with ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(extract_zip_file, zip_file, extract_to)
            for zip_file in zip_files
        ]

        # Wait for all futures to complete
        for future in futures:
            future.result()
    
    print("Done.")

In [10]:
import ipywidgets as widgets
from IPython.display import display

# Create a Text widget
text_widget_filepath = widgets.Text(
    value="", placeholder="Path to Extracted Dataset", description="Filepath:", disabled=False
)
text_widget_extractTo = widgets.Text(
    value="",
    placeholder="Output path of Extracted Sets",
    description="Ouput FP:",
    disabled=False,
)
# Variable to store the text
filepath = ""
extractTo = ""

# Function to handle text changes
def handle_text_change_filepath(change):
    global filepath
    filepath = change["new"]
    print(f"Input stored: {filepath}")


# Function to handle text changes
def handle_text_change_extractTo(change):
    global extractTo
    extractTo = change["new"]
    print(f"Output stored: {extractTo}")


# Set up the event handler
text_widget_filepath.observe(handle_text_change_filepath, names="value")
text_widget_extractTo.observe(handle_text_change_extractTo, names="value")
# Display the widget
display(text_widget_filepath)
display(text_widget_extractTo)

Text(value='', description='Filepath:', placeholder='Path to Extracted Dataset')

Text(value='', description='Ouput FP:', placeholder='Output path of Extracted Sets')

Input stored: "A:\Documents\Capstone\Dataset\806302"
Output stored: "A:\Documents\Capstone\Dataset\806302"
Output stored: "A:\Documents\Capstone\Dataset\E"
Output stored: "A:\Documents\Capstone\Dataset\Ex"
Output stored: "A:\Documents\Capstone\Dataset\Ext"
Output stored: "A:\Documents\Capstone\Dataset\Extr"
Output stored: "A:\Documents\Capstone\Dataset\Extra"
Output stored: "A:\Documents\Capstone\Dataset\Extrac"
Output stored: "A:\Documents\Capstone\Dataset\Extract"
Output stored: "A:\Documents\Capstone\Dataset\Extracte"
Output stored: "A:\Documents\Capstone\Dataset\Extracted"
Input stored: "A:\\Documents\Capstone\Dataset\806302"
Input stored: "A:\\Documents\\Capstone\Dataset\806302"
Input stored: "A:\\Documents\\Capstone\\Dataset\806302"
Input stored: "A:\\Documents\\Capstone\\Dataset\\806302"
Output stored: "A:\\Documents\Capstone\Dataset\Extracted"
Output stored: "A:\\Documents\\Capstone\Dataset\Extracted"
Output stored: "A:\\Documents\\Capstone\\Dataset\Extracted"
Output stored: "A

In [15]:
print("Using Filepath: ", filepath)
print("Extracting to : ", extractTo)
extract_zip_files(filepath, extractTo)

Using Filepath:  A:\\Documents\\Capstone\Dataset\\806302
Extracting to :  A:\\Documents\\Capstone\\Dataset\\Extracted\\
Done.


In [16]:
def readAveragesData(path: str):

    data = []
    # Read the CSV file and store the data in a list of dictionaries
    with open(path, "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            data.append(row)

    return data


def prepare_csv(data_dir, output) -> None:

    
    averagesData = readAveragesData(
        os.path.join(data_dir, "FrequencyRange_by_species_and_site_Averages.csv")
    )

    data = []

    # Iterate through each subfolder
    for siteDataSet in os.listdir(data_dir):
        site_folder = os.path.join(data_dir, siteDataSet)
        if os.path.isdir(site_folder):
            # example siteId  "Site01-1" such that the 4-6 index represents the site id; in this case 01
            siteId = int(siteDataSet[4:6])
            SiteData = [
                averageClassification
                for averageClassification in averagesData
                if int(averageClassification["SiteID"]) == siteId
            ]

            classifications = ", ".join(
                [classification["Species"] for classification in SiteData]
            )
            for audio_recording in os.listdir(site_folder):
                if audio_recording.endswith(".wav"):
                    audio_recording_abs_path = os.path.abspath(
                        os.path.join(site_folder, audio_recording)
                    )

                    data.append([siteId, audio_recording_abs_path, classifications])

    # Create DataFrame
    df = pd.DataFrame(
        data,
        columns=[
            "siteId",
            "filename",
            "species",
        ],
    )

    df.to_csv(output, index=False)

In [17]:
data_dir = extractTo
output = "processed/processed.csv"
prepare_csv(data_dir, output)

In [19]:
def extract_features(file_path):
    """
    Extract features from audio file using librosa.

    Args:
        file_path (str): Path to the audio file.

    Returns:
        np.array: Extracted features.
    """
    audio, sr = librosa.load(file_path)
    result = np.array([])

    # MFCC
    mfccs = np.mean(librosa.feature.mfcc(y=audio, sr=sr).T, axis=0)
    result = np.hstack((result, mfccs))

    # Chroma
    stft = np.abs(librosa.stft(audio))
    chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sr).T, axis=0)
    result = np.hstack((result, chroma))

    # Mel-scaled spectrogram
    mel = np.mean(librosa.feature.melspectrogram(y=audio, sr=sr).T, axis=0)
    result = np.hstack((result, mel))

    return result


def process_data(data_csv_path):
    """Read Csv with fileanme and generate spectrogram for each sample

    Returns:
        DataFrame: dataframe with all data
    """
    # data_csv_path = sys.argv[1]

    df = pd.read_csv(data_csv_path)

    # Initialize a list to store the results
    spectrograms = []

    with ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(extract_features, row["filename"])
            for _, row in df.iterrows()
        ]

        for future in as_completed(futures):
            try:
                spectrogram = future.result()
                spectrograms.append(spectrogram)
            except Exception as exc:
                print(f"Generated an exception: {exc}")

    # Convert the list of spectrograms into a DataFrame
    spectrogram_df = pd.DataFrame(spectrograms)

    # Concatenate the original DataFrame with the new DataFrame containing spectrograms
    df = pd.concat([df, spectrogram_df], axis=1)

    return df

In [20]:
csv_dir = output
df = process_data(csv_dir)
df.head()

Unnamed: 0,siteId,filename,species,0,1,2,3,4,5,6,...,150,151,152,153,154,155,156,157,158,159
0,1,A:\Documents\Capstone\Dataset\Extracted\Site01...,"E. coqui - co, E. coqui - qui, E. wightmanae",-236.219101,63.262177,-57.943291,49.081966,10.479619,48.403107,-37.035892,...,9e-06,3e-06,3e-06,3e-06,3e-06,3e-06,3e-06,3e-06,1e-06,1.106068e-07
1,1,A:\Documents\Capstone\Dataset\Extracted\Site01...,"E. coqui - co, E. coqui - qui, E. wightmanae",-245.313828,59.316032,-79.574265,33.32478,11.406502,31.833162,-28.828608,...,1.8e-05,8e-06,7e-06,7e-06,6e-06,6e-06,7e-06,8e-06,3e-06,2.021079e-07
2,1,A:\Documents\Capstone\Dataset\Extracted\Site01...,"E. coqui - co, E. coqui - qui, E. wightmanae",-209.789093,46.209599,-96.112762,31.74749,32.042698,39.617424,-38.199471,...,8.2e-05,5.4e-05,4.6e-05,4e-05,3.4e-05,3.1e-05,3.9e-05,4e-05,2.1e-05,1.46862e-06
3,1,A:\Documents\Capstone\Dataset\Extracted\Site01...,"E. coqui - co, E. coqui - qui, E. wightmanae",-219.443344,46.16124,-95.875694,47.622898,17.900232,44.626976,-33.330956,...,3.3e-05,3.1e-05,2.7e-05,1.4e-05,1.3e-05,1.6e-05,1.9e-05,2e-05,9e-06,4.669851e-07
4,1,A:\Documents\Capstone\Dataset\Extracted\Site01...,"E. coqui - co, E. coqui - qui, E. wightmanae",-215.081345,45.414715,-97.795143,36.125481,27.784109,37.399685,-34.257126,...,5.3e-05,4.2e-05,3.5e-05,2.6e-05,2.2e-05,1.9e-05,2.5e-05,2.5e-05,1.2e-05,9.5164e-07


In [1]:
x = df.drop(
    columns=["filename", "species"]
)  # Adjust this to include only feature columns
# Convert all column names to strings
x.columns = x.columns.astype(str)

x

NameError: name 'df' is not defined

In [None]:
y = df["species"]

# Encode the target labels as integers
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)
y



In [None]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

classifier = RandomForestClassifier(n_estimators=600, max_depth=18, min_samples_leaf=3)

classifier.fit(x_train, y_train)

y_pred = classifier.predict_proba(
    x_test,
)

accuracy = roc_auc_score(y_test, y_pred, multi_class="ovr")
print("Accuracy :", accuracy)




In [None]:
import pickle
with open("../Backend/trainedRF.pkl", "wb") as f:
    pickle.dump(classifier, f)