In [2]:
import librosa
import librosa.display
import numpy as np
import os
import logging
import sounddevice as sd
import wave
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.preprocessing import LabelEncoder
import plotly.graph_objects as go
from dash import Dash, dcc, html, Input, Output
import joblib
import matplotlib.pyplot as plt
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.models import Model
import base64
import dash_bootstrap_components as dbc

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class AudioSpectrogramClassifier:
    def __init__(self, data_dirs, spectrogram_dir, model_save_path='audio_classifier.pkl'):
        self.data_dirs = data_dirs
        self.spectrogram_dir = spectrogram_dir
        self.model_save_path = model_save_path
        self.feature_extractor = self._initialize_feature_extractor()
        self.features = []
        self.labels = []
        self.X_train, self.X_test, self.y_train, self.y_test = None, None, None, None
        self.clf = None
        self.train_accuracy = []
        self.test_accuracy = []
        self.f1_scores = []
        self.epochs = []
        self.label_encoder = LabelEncoder()
        self._create_spectrogram_directory()

    def _initialize_feature_extractor(self):
        base_model = MobileNetV2(weights='imagenet', include_top=False, pooling='avg')
        for layer in base_model.layers[:-10]:  # Fine-tune last 10 layers
            layer.trainable = True
        return Model(inputs=base_model.input, outputs=base_model.output)

    def _create_spectrogram_directory(self):
        if not os.path.exists(self.spectrogram_dir):
            os.makedirs(self.spectrogram_dir)

    def _load_audio(self, audio_path, sr=22050):
        try:
            if not os.path.isfile(audio_path):
                logging.error(f"Audio path is not a file: {audio_path}")
                return None, sr
            y, sr = librosa.load(audio_path, sr=sr, mono=True)
            return y, sr
        except Exception as e:
            logging.error(f"Error loading audio {audio_path}: {e}")
            return None, sr

    def audio_to_spectrogram(self, audio_path, sr=22050):
        y, sr = self._load_audio(audio_path, sr)
        if y is None:
            return None
        spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)
        log_spectrogram = librosa.power_to_db(spectrogram, ref=np.max)
        return log_spectrogram

    def save_spectrogram(self, spectrogram, filename):
        plt.figure(figsize=(2, 2))
        librosa.display.specshow(spectrogram, sr=22050, x_axis='time', y_axis='mel')
        plt.axis('off')
        plt.savefig(filename, bbox_inches='tight', pad_inches=0)
        plt.close()

    def extract_features(self, img_path):
        img = image.load_img(img_path, target_size=(224, 224))
        x = image.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)
        features = self.feature_extractor.predict(x)
        return features.flatten()

    def prepare_dataset(self):
        for label, data_dir in self.data_dirs.items():
            if not os.path.isdir(data_dir):
                logging.error(f"Data directory is not valid: {data_dir}")
                continue
            audio_files = os.listdir(data_dir)
            logging.info(f"Found audio files in {data_dir}: {audio_files}")
            for audio_file in audio_files:
                audio_path = os.path.join(data_dir, audio_file)
                if audio_file.lower().endswith(('.wav', '.mp3', '.flac')):
                    logging.info(f"Processing audio file: {audio_path}")
                    spectrogram = self.audio_to_spectrogram(audio_path)
                    if spectrogram is not None:
                        spectrogram_filename = os.path.join(self.spectrogram_dir, f"{label}_{audio_file}.png")
                        self.save_spectrogram(spectrogram, spectrogram_filename)
                        feature_vector = self.extract_features(spectrogram_filename)
                        self.features.append(feature_vector)
                        self.labels.append(label)
                    else:
                        logging.warning(f"Spectrogram generation failed for {audio_path}. Skipping.")
        self.labels = self.label_encoder.fit_transform(self.labels)
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            np.array(self.features),
            np.array(self.labels),
            test_size=0.2,
            random_state=42
        )

    def train_classifier(self, epochs=5, use_xgboost=False, use_svm=False):
        self.prepare_dataset()

        if use_xgboost:
            self.clf = XGBClassifier(random_state=42, reg_alpha=0.1, reg_lambda=0.1)  # Add regularization
            param_grid = {
                'n_estimators': [50, 100, 150],
                'max_depth': [3, 6, 9],
                'learning_rate': [0.01, 0.1, 0.2]
            }
            self.clf = GridSearchCV(self.clf, param_grid, cv=3, scoring='accuracy')
        elif use_svm:
            self.clf = SVC(probability=True, random_state=42, C=1.0, kernel='rbf')  # Regularization parameter
        else:
            self.clf = RandomForestClassifier(
                n_estimators=100, max_depth=10, min_samples_split=4,
                random_state=42, class_weight='balanced'
            )

        for epoch in range(1, epochs + 1):
            self.clf.fit(self.X_train, self.y_train)
            y_train_pred = self.clf.predict(self.X_train)
            y_test_pred = self.clf.predict(self.X_test)

            train_acc = accuracy_score(self.y_train, y_train_pred)
            test_acc = accuracy_score(self.y_test, y_test_pred)
            f1 = f1_score(self.y_test, y_test_pred, average='weighted')

            self.train_accuracy.append(train_acc)
            self.test_accuracy.append(test_acc)
            self.f1_scores.append(f1)
            self.epochs.append(epoch)

            logging.info(f"Epoch {epoch}: Train Acc={train_acc}, Test Acc={test_acc}, F1={f1}")

            # Early stopping condition
            if epoch > 1 and test_acc < self.test_accuracy[-2]:
                logging.info("Early stopping triggered.")
                break

        joblib.dump(self.clf, self.model_save_path)
        logging.info(f"Model trained and saved to {self.model_save_path}")

    def plot_training_metrics(self):
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=self.epochs, y=self.train_accuracy, mode='lines+markers', name="Train Accuracy"))
        fig.add_trace(go.Scatter(x=self.epochs, y=self.test_accuracy, mode='lines+markers', name="Test Accuracy"))
        fig.add_trace(go.Scatter(x=self.epochs, y=self.f1_scores, mode='lines+markers', name="F1 Score"))
        fig.update_layout(title="Model Training Metrics Over Epochs", xaxis_title="Epochs", yaxis_title="Metrics")
        return fig

    def plot_audio_waveform(self, audio_path):
        y, sr = self._load_audio(audio_path)
        if y is None:
            return go.Figure()  # Return an empty figure if audio loading failed
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=np.arange(len(y)) / sr, y=y, mode='lines', name="Waveform"))
        fig.update_layout(title="Audio Waveform", xaxis_title="Time (s)", yaxis_title="Amplitude")
        return fig

    def plot_audio_spectrogram(self, audio_path):
        y, sr = self._load_audio(audio_path)
        if y is None:
            return go.Figure()  # Return an empty figure if audio loading failed
        spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)
        log_spectrogram = librosa.power_to_db(spectrogram , ref=np.max)
        fig = go.Figure(data=go.Heatmap(z=log_spectrogram, colorscale="Viridis"))
        fig.update_layout(title="Mel Spectrogram", xaxis_title="Time", yaxis_title="Frequency")
        return fig

    def plot_confusion_matrix(self):
        y_pred = self.clf.predict(self.X_test)
        cm = confusion_matrix(self.y_test, y_pred)
        fig = go.Figure(data=go.Heatmap(z=cm, colorscale='Blues',
                                          x=['Predicted: ' + str(i) for i in self.label_encoder.classes_],
                                          y=['True: ' + str(i) for i in self.label_encoder.classes_], 
                                          hoverinfo='z+text', text=cm, showscale=True))
        fig.update_layout(title='Confusion Matrix', xaxis_title='Predicted Label', yaxis_title='True Label')

        # Add annotations to the confusion matrix
        for i in range(len(cm)):
            for j in range(len(cm)):
                fig.add_annotation(
                    x=j,
                    y=i,
                    text=str(cm[i, j]),
                    showarrow=False,
                    font=dict(color='white' if cm[i, j] > cm.max() / 2 else 'black')
                )
        return fig

    def plot_feature_importance(self):
        if hasattr(self.clf, 'feature_importances_'):
            importances = self.clf.feature_importances_
            indices = np.argsort(importances)[::-1]
            fig = go.Figure()
            fig.add_trace(go.Bar(
                x=importances[indices],
                y=[f'Feature {i+1}' for i in indices],
                orientation='h'
            ))
            fig.update_layout(title='Feature Importance', xaxis_title='Importance', yaxis_title='Features')
            return fig
        else:
            logging.warning("The classifier does not have feature importances.")
            return go.Figure()  # Return an empty figure or a message indicating no importances

    def plot_training_data_distribution(self):
        label_counts = np.bincount(self.y_train)
        fig = go.Figure(data=go.Bar(
            x=self.label_encoder.classes_,
            y=label_counts
        ))
        fig.update_layout(title='Training Data Distribution', xaxis_title='Classes', yaxis_title='Number of Samples')
        return fig

    def plot_feature_distribution(self):
        if self.X_train is not None:
            fig = go.Figure()
            for i in range(self.X_train.shape[1]):
                fig.add_trace(go.Histogram(x=self.X_train[:, i], name=f'Feature {i+1}', opacity=0.75))
            fig.update_layout(title='Feature Distribution', xaxis_title='Feature Value', yaxis_title='Count')
            return fig
        else:
            logging.warning("Training data is not available for feature distribution.")
            return go.Figure()

    def predict_bird_count(self, audio_path):
        y, sr = self._load_audio(audio_path)
        if y is None:
            logging.error("Audio loading failed.")
            return "Error loading audio file."
        
        # Segment the audio into 5-second chunks
        segment_duration = 5  # seconds
        segment_samples = segment_duration * sr
        bird_predictions = []

        for start in range(0, len(y), segment_samples):
            end = min(start + segment_samples, len(y))
            segment = y[start:end]
            if len(segment) == 0:
                continue  # Skip empty segments
            spectrogram = self.audio_to_spectrogram(audio_path)  # Use the original audio path
            if spectrogram is not None:
                spectrogram_filename = "temp_spectrogram.png"
                self.save_spectrogram(spectrogram, spectrogram_filename)
                feature_vector = self.extract_features(spectrogram_filename)
                prediction = self.clf.predict(np.array([feature_vector]))
                predicted_label = self.label_encoder.inverse_transform(prediction)
                timestamp = start / sr  # Convert sample index to time in seconds
                bird_predictions.append((predicted_label[0], timestamp))  # Store the prediction and timestamp

        return bird_predictions

    def record_audio(self, duration=5, filename='live_recording.wav'):
        """Record audio from the microphone."""
        logging.info(f"Recording audio for {duration} seconds...")
        fs = 44100  # Sample rate
        recording = sd.rec(int(duration * fs), samplerate=fs, channels=1, dtype='float64')
        sd.wait()  # Wait until recording is finished
        # Save the recording as a WAV file
        with wave.open(filename, 'wb') as wf:
            wf.setnchannels(1)
            wf.setsampwidth(2)  # ```python
            wf.setframerate(fs)
            wf.writeframes((recording * 32767).astype(np.int16))  # Convert to 16-bit PCM
        logging.info(f"Recording saved to {filename}")

# Define data directories and spectrogram directory
data_dirs = {
    'Capuchin bird': r'C:\Users\soume\OneDrive\Documents\WORK M\Capuchin bird', 
    'Thryomanes bewickii': r'C:\Users\soume\OneDrive\Documents\WORK M\Thryomanes  bewickii',
    'Pomatorhinus ruficollis': r'C:\Users\soume\OneDrive\Documents\WORK M\Pomatorhinus ruficollis',
    'Arborophila torqueola': r'C:\Users\soume\OneDrive\Documents\WORK M\Arborophila torqueola',
    'Macronus gularis': r'C:\Users\soume\OneDrive\Documents\WORK M\Macronus gularis'
}
spectrogram_dir = r'C:\Users\soume\OneDrive\Documents\WORK M\spectrograms'

# Initialize and train classifier
classifier = AudioSpectrogramClassifier(data_dirs, spectrogram_dir)
classifier.train_classifier(epochs=5, use_xgboost=True)

# Initialize Dash app with layout
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
app.layout = dbc.Container([
    dbc.Row(dbc.Col(html.H1("Bird Species Classification from Audio", className="text-center"), width=12)),
    dcc.Tabs([
        dcc.Tab(label="Model Performance", children=[
            dcc.Graph(figure=classifier.plot_training_metrics()),
            dcc.Graph(figure=classifier.plot_confusion_matrix()),
            dcc.Graph(figure=classifier.plot_feature_importance()),
            dcc.Graph(figure=classifier.plot_training_data_distribution())
        ]),
        dcc.Tab(label="Audio Visualization", children=[
            html.Label("Select Audio File:"),
            dcc.Dropdown(
                id='audio-file-dropdown',
                options=[{'label': file, 'value': os.path.join(path, file)}
                         for label, path in data_dirs.items() for file in os.listdir(path) if file.lower().endswith(('.wav', '.mp3', '.flac'))],
                value=list(data_dirs.values())[0]  # Default to the first file
            ),
            dcc.Graph(id='waveform-plot'),
            dcc.Graph(id='spectrogram-plot')
        ]),
        dcc.Tab(label="Feature Distribution", children=[
            dcc.Graph(figure=classifier.plot_feature_distribution())
        ]),
        dcc.Tab(label="Bird Count Prediction", children=[
            dcc.Upload(
                id='upload-audio',
                children=dbc.Button('Upload Audio File', color="primary"),
                multiple=False
            ),
            html.Div(id='prediction-output', className="mt-3")
        ]),
        dcc.Tab(label="Live Audio Recording", children=[
            dbc.Button("Record Audio", id="record-button", color="success"),
            html.Div(id='live-prediction-output', className="mt-3")
        ])
    ])
], fluid=True)

# Callback for updating audio visualizations
@app.callback(
    [Output('waveform-plot', 'figure'), Output('spectrogram-plot', 'figure')],
    [Input('audio-file-dropdown', 'value')]
)
def update_audio_visualizations(selected_audio_file):
    waveform_fig = classifier.plot_audio_waveform(selected_audio_file)
    spectrogram_fig = classifier.plot_audio_spectrogram(selected_audio_file)
    return waveform_fig, spectrogram_fig

# Callback for predicting bird count from uploaded audio
@app.callback(
    Output('prediction-output', 'children'),
    [Input('upload-audio', 'contents')]
)
def update_prediction(uploaded_file):
    if uploaded_file is not None:
        content_type, content_string = uploaded_file.split(',')
        decoded = base64.b64decode(content_string)
        audio_path = 'uploaded_audio.wav'

        try:
            with open(audio_path, 'wb') as f:
                f.write(decoded)

            predictions = classifier.predict_bird_count(audio_path)
            result_text = "\n".join([f"Time: {timestamp:.2f}s - Predicted Bird: {bird}" for bird, timestamp in predictions])
            return f"Predicted Bird Counts:\n{result_text}" if predictions else "No birds detected."
        except Exception as e:
            logging.error(f"Error during prediction: {e}")
            return "Error during prediction. Please check the audio file."
    return "No audio file uploaded."

# Callback for live audio recording
@app.callback(
    Output('live-prediction-output', 'children'),
 [Input('record-button', 'n_clicks')]
)
def record_and_predict(n_clicks):
    if n_clicks is not None:
        # Record audio for 5 seconds
        audio_filename = 'live_recording.wav'
        classifier.record_audio(duration=5, filename=audio_filename)

        # Predict bird count from the recorded audio
        predictions = classifier.predict_bird_count(audio_filename)
        result_text = "\n".join([f"Time: {timestamp:.2f}s - Predicted Bird: {bird}" for bird, timestamp in predictions])
        return f"Predicted Bird Counts:\n{result_text}" if predictions else "No birds detected."
    return "Click the button to record audio."

# Run the Dash app
if __name__ == '__main__':
    app.run_server(debug=True)

ModuleNotFoundError: No module named 'tensorflow.python.distribute.distribution_strategy_context'