In [None]:
import pandas as pd
import os
import shutil
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import scipy.io.wavfile as wavfile
import scipy.signal
import csv

File collection

In [None]:


# Load the CSV file into a DataFrame
csv_path = "C:/Users/shrra/Downloads/2024-06-25T13-17_export.csv"
df = pd.read_csv(csv_path)

# Select relevant columns
selected_columns = ['record_id', 'demographics_session_id']
df = df[selected_columns]

# Create 'subject_id' column
df['subject_id'] = 'sub-' + df['record_id'] + '/ses-' + df['demographics_session_id']

def get_file_paths(subject_ids, base_dir):
    file_paths = []
    for subject_id in subject_ids:
        session_dir = os.path.join(base_dir, subject_id, 'audio')
        if os.path.exists(session_dir):
            for file_name in os.listdir(session_dir):
                if file_name.endswith('Respiration-and-cough_rec-Respiration-and-cough-Cough-2.wav'):
                    file_paths.append(os.path.join(session_dir, file_name))
                    break  # Assuming only one relevant file per session
        else:
            print(f'Session directory {session_dir} does not exist.')
    return file_paths

# Get file paths for all subjects
base_dir = 'C:/Users/shrra/Downloads/Hackathon/bridge2ai-voice-corpus-2-including-sensitive-recordings1/bridge2ai-voice-corpus-2-including-sensitive-recordings1/bids_with_sensitive_recordings'
file_paths = get_file_paths(df['subject_id'], base_dir)

def copy_files(file_paths, dest_dir):
    for file_path in file_paths:
        if os.path.exists(file_path):
            try:
                shutil.copy(file_path, dest_dir)
                print(f'Copied {file_path} to {dest_dir}')
            except PermissionError:
                print(f'Permission denied for file {file_path}')
            except shutil.SameFileError:
                print(f'File {file_path} is already present in the destination directory')
            except Exception as e:
                print(f'Error copying file {file_path}: {e}')
        else:
            print(f'File {file_path} does not exist.')

# Define destination directory
dest_dir = 'C:/Users/shrra/Downloads/Hackathon/cough'

# Ensure the destination directory exists or create it
if not os.path.exists(dest_dir):
    os.makedirs(dest_dir)

# Copy files
copy_files(file_paths, dest_dir)

print("File copying complete.")


Fila name prep

In [None]:


# Define the base directory and folder names
base_dir = 'C:/Users/shrra/Downloads/Hackathon'
folders = {
    'cough': 'Cough',
    # 'yell': 'Yell',
    'breathing': 'Breathing',
    'speech': 'Speech'
}

# Prepare a list to collect file names and labels
file_data = []

# Iterate over each folder and collect file names and labels
for folder_name, label in folders.items():
    folder_path = os.path.join(base_dir, folder_name)
    if os.path.exists(folder_path):
        for file_name in os.listdir(folder_path):
            # Full path of the file
            file_data.append({'file_name': os.path.join(folder_name, file_name), 'correct_label': label})

# Create a DataFrame from the collected data
df = pd.DataFrame(file_data)

# Define the output CSV file path
output_csv_path = 'C:/Users/shrra/Downloads/Hackathon/Inputdata_labels.csv'

# Save the DataFrame to a CSV file
df.to_csv(output_csv_path, index=False)

print(f"CSV file saved to {output_csv_path}")


Yamnet

Top 1 label

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import pandas as pd
import os
import scipy.io.wavfile as wavfile
import scipy.signal
import csv

# Load YAMNet model
def load_model():
    model_url = 'https://tfhub.dev/google/yamnet/1'
    model = hub.load(model_url)
    return model

# Load class labels from the model
def load_class_labels(model):
    class_map_path = model.class_map_path().numpy()
    class_names = []
    with tf.io.gfile.GFile(class_map_path) as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            class_names.append(row['display_name'])
    return class_names

# Ensure audio is at the desired sample rate
def ensure_sample_rate(original_sample_rate, waveform, desired_sample_rate=16000):
    if original_sample_rate != desired_sample_rate:
        desired_length = int(round(float(len(waveform)) / original_sample_rate * desired_sample_rate))
        waveform = scipy.signal.resample(waveform, desired_length)
    return desired_sample_rate, waveform

# Normalize waveform to [-1.0, 1.0]
def normalize_waveform(waveform):
    return waveform / np.max(np.abs(waveform))

# Process audio file and make predictions
def process_file(file_path, model, class_names):
    sample_rate, wav_data = wavfile.read(file_path)
    sample_rate, wav_data = ensure_sample_rate(sample_rate, wav_data)
    wav_data = normalize_waveform(wav_data)
    
    # Make predictions
    waveform = tf.convert_to_tensor(wav_data, dtype=tf.float32)
    scores, embeddings, spectrogram = model(waveform)
    
    # Get predictions
    scores_np = scores.numpy()
    mean_scores = np.mean(scores_np, axis=0)
    top_class_index = np.argmax(mean_scores)
    top_label = class_names[top_class_index]
    
    return top_label

# Main function to read CSV and process each file
def main(csv_path, base_dir):
    df = pd.read_csv(csv_path)
    file_names = df['file_name'].tolist()
    
    model = load_model()
    class_names = load_class_labels(model)
    
    results = []
    
    for file_name in file_names:
        file_path = os.path.join(base_dir, file_name)
        
        if not os.path.isfile(file_path):
            print(f"File not found: {file_path}")
            continue
        
        try:
            top_label = process_file(file_path, model, class_names)
            results.append({
                'file_name': file_name,
                'top_label': top_label
            })
        except Exception as e:
            print(f"Error processing {file_name}: {e}")
    
    results_df = pd.DataFrame(results)
    results_df.to_csv('C:/Users/shrra/Downloads/Hackathon/predictions1.csv', index=False)

if __name__ == '__main__':
    csv_path = 'C:/Users/shrra/Downloads/Hackathon/filepath.csv'
    base_dir = 'C:/Users/shrra/Downloads/Hackathon'
    main(csv_path, base_dir)


All labels but finalsie to only 4

In [None]:

# Load YAMNet model
def load_model():
    model_url = 'https://tfhub.dev/google/yamnet/1'
    model = hub.load(model_url)
    return model

# Load class labels from the model
def load_class_labels(model):
    class_map_path = model.class_map_path().numpy()
    class_names = []
    with tf.io.gfile.GFile(class_map_path) as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            class_names.append(row['display_name'])
    return class_names

# Ensure audio is at the desired sample rate
def ensure_sample_rate(original_sample_rate, waveform, desired_sample_rate=16000):
    if original_sample_rate != desired_sample_rate:
        desired_length = int(round(float(len(waveform)) / original_sample_rate * desired_sample_rate))
        waveform = scipy.signal.resample(waveform, desired_length)
    return desired_sample_rate, waveform

# Normalize waveform to [-1.0, 1.0]
def normalize_waveform(waveform):
    return waveform / np.max(np.abs(waveform))

# Process audio file and make predictions
def process_file(file_path, model, class_names, target_labels):
    sample_rate, wav_data = wavfile.read(file_path)
    sample_rate, wav_data = ensure_sample_rate(sample_rate, wav_data)
    wav_data = normalize_waveform(wav_data)
    
    # Make predictions
    waveform = tf.convert_to_tensor(wav_data, dtype=tf.float32)
    scores, embeddings, spectrogram = model(waveform)
    
    # Get predictions
    scores_np = scores.numpy()
    mean_scores = np.mean(scores_np, axis=0)
    
    # Debugging: Print top scores and labels
    top_class_indices = np.argsort(mean_scores)[::-1][:5]
    top_labels = [class_names[i] for i in top_class_indices]
    top_scores = [mean_scores[i] for i in top_class_indices]
    
    print(f"File: {file_path}")
    print("Top labels and scores:")
    for label, score in zip(top_labels, top_scores):
        print(f"  {label}: {score:.2f}")
    
    # Get the top class index
    top_class_index = np.argmax(mean_scores)
    top_label = class_names[top_class_index]
    
    # Return top label if it is in the target labels
    if top_label in target_labels:
        return top_label
    else:
        return 'Unknown'

# Main function to read CSV and process each file
def main(csv_path, base_dir):
    df = pd.read_csv(csv_path)
    file_names = df['file_name'].tolist()
    
    model = load_model()
    class_names = load_class_labels(model)
    
    # Define the target labels of interest
    target_labels = {'Breathing', 'Speech', 'Cough', 'Yell'}
    
    results = []
    
    for file_name in file_names:
        file_path = os.path.join(base_dir, file_name)
        
        if not os.path.isfile(file_path):
            print(f"File not found: {file_path}")
            continue
        
        try:
            top_label = process_file(file_path, model, class_names, target_labels)
            results.append({
                'file_name': file_name,
                'top_label': top_label
            })
        except Exception as e:
            print(f"Error processing {file_name}: {e}")
    
    results_df = pd.DataFrame(results)
    results_df.to_csv('C:/Users/shrra/Downloads/Hackathon/predictions.csv', index=False)

if __name__ == '__main__':
    csv_path = 'C:/Users/shrra/Downloads/Hackathon/filepath.csv'
    base_dir = 'C:/Users/shrra/Downloads/Hackathon'
    main(csv_path, base_dir)


Accuracy check

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Function to calculate accuracy, confusion matrix, and generate pie chart
def analyze_predictions(predictions_file, labels_file):
    # Load the predictions and labels CSV files
    predictions_df = pd.read_csv(predictions_file)
    labels_df = pd.read_csv(labels_file)
    
    # Ensure that both files have the same 'file_name' column
    if not predictions_df['file_name'].equals(labels_df['file_name']):
        raise ValueError("File names in predictions and labels do not match.")
    
    # Merge the two dataframes on 'file_name'
    merged_df = pd.merge(predictions_df, labels_df, on='file_name')
    
    # Compare predictions to actual labels
    correct_predictions = merged_df['top_label'] == merged_df['correct_label']
    accuracy = correct_predictions.mean() * 100  # Convert to percentage
    
    print(f'Accuracy: {accuracy:.2f}%')
    
    # Get unique labels
    all_labels = pd.concat([merged_df['top_label'], merged_df['correct_label']]).unique()
    
    # Generate confusion matrix
    conf_matrix = confusion_matrix(
        merged_df['correct_label'], 
        merged_df['top_label'], 
        labels=all_labels
    )
    
    # Plot confusion matrix
    plt.figure(figsize=(12, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=all_labels, yticklabels=all_labels)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title('Confusion Matrix')
    plt.show()
    
    # Plot pie chart for predictions
    prediction_counts = merged_df['top_label'].value_counts()
    plt.figure(figsize=(8, 8))
    plt.pie(prediction_counts, labels=prediction_counts.index, autopct='%1.1f%%', startangle=140)
    plt.title('Distribution of Predictions')
    plt.show()

# Paths to the predictions and labels files
predictions_file = 'C:/Users/shrra/Downloads/Hackathon/predictions1.csv'
labels_file = 'C:/Users/shrra/Downloads/Hackathon/Inputdata_labels.csv'

# Calculate accuracy, confusion matrix, and pie chart
analyze_predictions(predictions_file, labels_file)


Top label

Checking for all labels

In [None]:

# # Load YAMNet model
# def load_model():
#     model_url = 'https://tfhub.dev/google/yamnet/1'
#     model = hub.load(model_url)
#     return model

# # Load class labels from the model
# def load_class_labels(model):
#     class_map_path = model.class_map_path().numpy()
#     class_names = []
#     with tf.io.gfile.GFile(class_map_path) as csvfile:
#         reader = csv.DictReader(csvfile)
#         for row in reader:
#             class_names.append(row['display_name'])
#     return class_names

# # Ensure audio is at the desired sample rate
# def ensure_sample_rate(original_sample_rate, waveform, desired_sample_rate=16000):
#     if original_sample_rate != desired_sample_rate:
#         desired_length = int(round(float(len(waveform)) / original_sample_rate * desired_sample_rate))
#         waveform = scipy.signal.resample(waveform, desired_length)
#     return desired_sample_rate, waveform

# # Normalize waveform to [-1.0, 1.0]
# def normalize_waveform(waveform):
#     return waveform / np.max(np.abs(waveform))

# # Process audio file and make predictions
# def process_file(file_path, model, class_names):
#     sample_rate, wav_data = wavfile.read(file_path)
#     sample_rate, wav_data = ensure_sample_rate(sample_rate, wav_data)
#     wav_data = normalize_waveform(wav_data)
    
#     # Make predictions
#     waveform = tf.convert_to_tensor(wav_data, dtype=tf.float32)
#     scores, embeddings, spectrogram = model(waveform)
    
#     # Get predictions
#     scores_np = scores.numpy()
#     mean_scores = np.mean(scores_np, axis=0)
#     top_class_indices = np.argsort(mean_scores)[::-1][:5]
#     top_labels = [class_names[i] for i in top_class_indices]
    
#     return top_labels

# # Main function to read CSV and process each file
# def main(csv_path, base_dir):
#     df = pd.read_csv(csv_path)
#     file_names = df['file_name'].tolist()
    
#     model = load_model()
#     class_names = load_class_labels(model)
    
#     results = []
    
#     for file_name in file_names:
#         file_path = os.path.join(base_dir, file_name)
        
#         if not os.path.isfile(file_path):
#             print(f"File not found: {file_path}")
#             continue
        
#         try:
#             top_labels = process_file(file_path, model, class_names)
#             results.append({
#                 'file_name': file_name,
#                 'top_labels': ', '.join(top_labels)
#             })
#         except Exception as e:
#             print(f"Error processing {file_name}: {e}")
    
#     results_df = pd.DataFrame(results)
#     results_df.to_csv('C:/Users/shrra/Downloads/Hackathon/predictions2.csv', index=False)

# if __name__ == '__main__':
#     csv_path = 'C:/Users/shrra/Downloads/Hackathon/filepath.csv'
#     base_dir = 'C:/Users/shrra/Downloads/Hackathon'
#     main(csv_path, base_dir)
