In [None]:
!pip install -q wfdb simpleaudio
!pip install -q pyngrok flask flask-cors

In [None]:
!git clone https://github.com/Gad-MA/Signal-Viewer-and-Classifier.git

In [None]:
import sys
import os

# Get the current working directory
current_dir = os.getcwd()

# Add the path to the cloned repository to the system path
repo_path = os.path.join(current_dir, 'Signal-Viewer-and-Classifier')
sys.path.append(repo_path)

# Verify the path has been added (optional)
print(sys.path)

# Aliasing Audio

In [None]:
from api.aliasing.aliasing import degrade_audio

In [None]:
import soundfile as sf
import librosa
import numpy as np

def alias_audio(input_filename, sampling_freq):
  output_filename = 'degraded_version.wav'

  try:
      # 2. LOAD YOUR FILE
      # We use sr=None to load the file at its *original* sample rate
      print(f"Loading '{input_filename}'...")
      y, sr = librosa.load(input_filename, sr=None)
      downsample_factor = int(sr / sampling_freq)
      print(f"File loaded successfully. (Sample Rate: {sr} Hz, Duration: {len(y)/sr:.2f}s)")

      # 3. PROCESS THE AUDIO
      # We pass the file's original sample rate (sr) to your function
      print("Applying degradation effects...")
      degraded_y = degrade_audio(y, sr_high=sr, downsample_factor=downsample_factor, add_noise=False, noise_level=0.005)

      # 4. SAVE THE NEW FILE
      # We save the new file using the *same* original sample rate
      # sf.write(output_filename, degraded_y, sr)
      # print(f"✅ Success! Degraded audio saved to '{output_filename}'")

  except FileNotFoundError:
      print(f"--- ERROR ---")
      print(f"File not found: '{input_filename}'")
      print("Please make sure the file is in the same directory as this script and the name is spelled correctly.")
  except Exception as e:
      print(f"An error occurred: {e}")

  return degraded_y, sr

# Anti-Aliasing

In [None]:
from api.anti_aliasing.inference import restore_and_clean_audio

# Gender Classification

In [None]:
from api.gender_detection.gender_detection import infer_gender_from_audio

# ECG Signal Retrival from Database

Now that the `wfdb` library is installed, you can use it to download and read the PTB-XL dataset.

The `wfdb.rdrecord` function can read a single record from the dataset. You'll need to specify the record name and the directory where the dataset is stored.

For the PTB-XL dataset, the records are organized in subdirectories. For example, the first record is located at `ptb-xl/1.0.3/records100/00000/00001_hr`.

Let's read the first record as an example:

The `record` object contains the signal data and metadata for the record.

You can access the signal data using `record.p_signal` and the metadata using `record.get_annotation()`.

To read multiple records or the entire dataset, you would typically iterate through the record list provided by PhysioNet for the dataset. The dataset page on PhysioNet usually provides a list of all records.

In [None]:
import wfdb
import matplotlib.pyplot as plt
from scipy.signal import resample
import numpy as np

def retrieve_data_from_ecg_dataset(name, desired_sampling_freq=500):
  """
  Retrieves ECG data from the PTB-XL dataset and resamples it to the desired frequency.

  Args:
    name (str): The name of the record to retrieve (e.g., '00001').
    desired_sampling_freq (int): The target sampling frequency (default is 500).

  Returns:
    np.ndarray: The resampled ECG data.
  """
  original_sampling_freq = 500 # The PTB-XL dataset records are originally at 500 Hz

  desired_directory = f'ptb-xl/1.0.3/records{original_sampling_freq}/{name[:2]}000/'
  record_name = f"{name}_hr"

  record = wfdb.rdrecord(record_name, pn_dir=desired_directory)
  ECG_data = record.p_signal

  if original_sampling_freq != desired_sampling_freq:
      # Resample the data
      num_samples = int(len(ECG_data) * desired_sampling_freq / original_sampling_freq)
      ECG_data = resample(ECG_data, num_samples)

  return ECG_data

# ECG Signal Classification

In [None]:
from api.ECG_Classifier.inference import predict_ecg_arrhythmia
import os

def classify_ecg_signal(name, desired_sampling_freq=500):
    # desired_directory = f'ptb-xl/1.0.3/records{sampling_freq}/{name[:2]}000/'
    # Construct the correct path to the model file
    model_path = os.path.join('Signal-Viewer-and-Classifier', 'api', 'ECG_Classifier', 'ecg_full_model.h5')
    return predict_ecg_arrhythmia(retrieve_data_from_ecg_dataset(name, desired_sampling_freq), model_path=model_path)

# Doppler Car Sound Generation

In [None]:
from api.doppler.doppler_sound_generator import doppler_effect_wav_generator

# Drone Sound Classifier

In [None]:
from api.drone.drones import process_drone_audio

# SAR

In [None]:
from api.SAR.SAR import analyze_sar_image

# EEG Classifier

In [None]:
!pip install -q mne antropy awscli
from api.EEG.Alzheimer import alzheimer_inference
from api.EEG.parkinson import parkinson_inference
# from api.EEG.Motor_Imagery import motor_inference
from api.EEG.Epilepsy import epilepsy_inference

In [None]:
# !aws s3 sync --no-sign-request s3://openneuro.org/ds004504 alzheimer_eeg_data/

In [None]:
# !aws s3 sync --no-sign-request s3://physionet-open/chbmit/1.0.0/chb02 /content/epilepsy_data/

# API

In [None]:
from pyngrok import ngrok
from flask import Flask, jsonify, request, send_file
import io
from flask_cors import CORS
import os

# Set your ngrok authtoken (optional but recommended)
# Get it from https://dashboard.ngrok.com/get-started/your-authtoken
# Replace with your actual authtoken
ngrok.set_auth_token("33ePdi4rrqqYDFd0PlCHMleUlX3_cmX8hBHXncgfEbzuCeS3")

# Start ngrok tunnel
# Ensure the Flask app runs on this port
port = 5000
public_url = ngrok.connect(port)
print("Public URL:", public_url)

app = Flask(__name__)
CORS(app) # Enable CORS for all origins

@app.route('/alias-speech', methods=['POST'])
def alias_speech():
    # Check if the request contains files
    if 'wav_file' not in request.files:
        return jsonify({'error': 'No file part in the request'}), 400

    desired_sampling_freq = int(request.args.get('sampling_freq'))

    file = request.files['wav_file']

    # If the user does not select a file, the browser submits an
    # empty file without a filename.
    if file.filename == '':
        return jsonify({'error': 'No selected file'}), 400

    if file:
        # Securely save the file. You might want to add validation here
        # to check if the file is actually a WAV file and sanitize the filename.
        # For demonstration, saving to a temporary location with a fixed name.
        # Consider using werkzeug.utils.secure_filename if saving user-provided filenames.
        upload_dir = 'uploaded_speech_audio' # Use a specific directory for drone uploads
        os.makedirs(upload_dir, exist_ok=True) # Create directory if it doesn't exist
        # Using a fixed filename for simplicity in this example
        filename = "uploaded_speech_audio.wav"
        filepath = os.path.join(upload_dir, filename)

        file.save(filepath)

        degraded_y, sr = alias_audio(filepath, desired_sampling_freq)

        # 5. Save the processed audio to an in-memory buffer
        buffer = io.BytesIO()
        # We use 'WAV' format and 'PCM_16' subtype for broad compatibility
        sf.write(buffer, degraded_y, sr, format='WAV', subtype='PCM_16')
        # Reset the buffer's position to the beginning
        buffer.seek(0)

        return send_file(
            buffer,
            as_attachment=True,
            download_name='degraded_audio.wav',
            mimetype='audio/wav'
        )

@app.route('/anti-alias', methods=['POST'])
def anti_alias():
    # Check if the request contains files
    if 'wav_file' not in request.files:
        return jsonify({'error': 'No file part in the request'}), 400

    file = request.files['wav_file']

    # If the user does not select a file, the browser submits an
    # empty file without a filename.
    if file.filename == '':
        return jsonify({'error': 'No selected file'}), 400

    if file:
        # Securely save the file. You might want to add validation here
        # to check if the file is actually a WAV file and sanitize the filename.
        # For demonstration, saving to a temporary location with a fixed name.
        # Consider using werkzeug.utils.secure_filename if saving user-provided filenames.
        upload_dir = 'uploaded_speech_audio_for_anti_alias' # Use a specific directory for drone uploads
        os.makedirs(upload_dir, exist_ok=True) # Create directory if it doesn't exist
        # Using a fixed filename for simplicity in this example
        filename = "uploaded_speech_audio_for_anti_alias.wav"
        filepath = os.path.join(upload_dir, filename)

        file.save(filepath)

        anti_aliased, final_sr = restore_and_clean_audio(filepath)

        # 4. Save the processed audio to an in-memory buffer
        buffer = io.BytesIO()
        sf.write(buffer, anti_aliased, final_sr, format='WAV', subtype='PCM_16')
        buffer.seek(0)

        # 5. Send the buffer back as a file
        return send_file(
            buffer,
            as_attachment=True,
            download_name='anti_aliased_audio.wav',
            mimetype='audio/wav'
        )

@app.route('/gender-classification', methods=['POST'])
def gender_classification():
    # Check if the request contains files
    if 'wav_file' not in request.files:
        return jsonify({'error': 'No file part in the request'}), 400
    file = request.files['wav_file']

    # If the user does not select a file, the browser submits an
    # empty file without a filename.
    if file.filename == '':
        return jsonify({'error': 'No selected file'}), 400

    if file:
        # Securely save the file. You might want to add validation here
        # to check if the file is actually a WAV file and sanitize the filename.
        # For demonstration, saving to a temporary location with a fixed name.
        # Consider using werkzeug.utils.secure_filename if saving user-provided filenames.
        upload_dir = 'uploaded_speech_audio_for_gender_classification' # Use a specific directory for drone uploads
        os.makedirs(upload_dir, exist_ok=True) # Create directory if it doesn't exist
        # Using a fixed filename for simplicity in this example
        filename = "uploaded_speech_audio_for_gender_classification.wav"
        filepath = os.path.join(upload_dir, filename)

        file.save(filepath)

        return jsonify(infer_gender_from_audio(filepath))

# @app.route('/ecg_data/', defaults={'name': None}) # Handles /ecg_data/
@app.route('/ecg_data') # Handles /ecg_data
def get_ecg_data():
    # Get 'name' from query parameters
    name = request.args.get('name')
    desired_sampling_freq = int(request.args.get('desired_sampling_freq'))
    striped_name = str(name)[:5]
    print(f"Received request for name: {name}") # Optional: for debugging

    # Assuming ECG_data is a numpy array available in the global scope
    # Convert the numpy array to a list for JSON serialization
    ecg_data_list = retrieve_data_from_ecg_dataset(striped_name, desired_sampling_freq).tolist()
    return jsonify(ecg_data_list)

@app.route('/ecg_classification')
def get_ecg_classification():
    name = request.args.get('name')
    striped_name = str(name)[:5]
    desired_sampling_freq = int(request.args.get('desired_sampling_freq'))
    print(f"Received request for name: {name}") # Optional: for debugging
    return jsonify(classify_ecg_signal(striped_name, desired_sampling_freq))

@app.route('/upload-drone-wav', methods=['POST'])
def upload_drone_wav():
    # Check if the request contains files
    if 'wav_file' not in request.files:
        return jsonify({'error': 'No file part in the request'}), 400

    file = request.files['wav_file']

    # If the user does not select a file, the browser submits an
    # empty file without a filename.
    if file.filename == '':
        return jsonify({'error': 'No selected file'}), 400

    if file:
        # Securely save the file. You might want to add validation here
        # to check if the file is actually a WAV file and sanitize the filename.
        # For demonstration, saving to a temporary location with a fixed name.
        # Consider using werkzeug.utils.secure_filename if saving user-provided filenames.
        upload_dir = 'uploaded_drone_audio' # Use a specific directory for drone uploads
        os.makedirs(upload_dir, exist_ok=True) # Create directory if it doesn't exist
        # Using a fixed filename for simplicity in this example
        filename = "uploaded_drone_audio.wav"
        filepath = os.path.join(upload_dir, filename)

        file.save(filepath)

        return process_drone_audio(filepath)

@app.route('/upload-sar-image', methods=['POST'])
def upload_sar_image():
    # Check if the request contains files
    if 'sar_image' not in request.files:
        return jsonify({'error': 'No file part in the request'}), 400

    file = request.files['sar_image']

    # If the user does not select a file, the browser submits an
    # empty file without a filename.
    if file.filename == '':
        return jsonify({'error': 'No selected file'}), 400

    if file:
        # Securely save the file. You might want to add validation here
        # to check if the file is actually a WAV file and sanitize the filename.
        # For demonstration, saving to a temporary location with a fixed name.
        # Consider using werkzeug.utils.secure_filename if saving user-provided filenames.
        upload_dir = 'uploaded_sar_image' # Use a specific directory for drone uploads
        os.makedirs(upload_dir, exist_ok=True) # Create directory if it doesn't exist
        # Using a fixed filename for simplicity in this example
        filename = "uploaded_sar_image.bmp"
        filepath = os.path.join(upload_dir, filename)

        file.save(filepath)

        return analyze_sar_image(filepath)

@app.route('/upload-eeg', methods=['POST'])
def upload_eeg():
    # Check if the request contains files
    subject_num = int(request.args.get('subject_num'))
    channel_num = int(request.args.get('channel_num'))
    print(f"Received request for subject_num: {subject_num}") # Optional: for debugging
    current_dir = os.getcwd()
    alzheimer_eeg_data_path = os.path.join(current_dir, f'alzheimer_eeg_data/sub-{subject_num:03d}/eeg/sub-{subject_num:03d}_task-eyesclosed_eeg.set')
    raw = mne.io.read_raw_eeglab(alzheimer_eeg_data_path)
    number_of_channels = len(raw.info['ch_names'])
    eeg_df = raw.to_data_frame()
    single_channel_df = eeg_df.iloc[:5000, channel_num].tolist()

    return jsonify({
        'number_of_channels': number_of_channels,
        # 'eeg_df': eeg_df.to_dict(orient='records')
        'single_channel': single_channel_df
    })

@app.route('/eeg_classify', methods=['POST'])
def eeg_classify():
    subject_num = int(request.args.get('subject_num'))
    return jsonify(
      epilepsy_inference.predict_seizure(f"/content/epilepsy_data/chb02_{subject_num:02d}.edf")
    )

@app.route('/generate-doppler-sound', methods=['POST'])
def generate_doppler_sound():
    # Extract and validate parameters with defaults
    source_velocity = request.args.get('source_velocity')
    source_freq = request.args.get('source_freq')
    normal_distance = request.args.get('normal_distance')
    half_simulation_duration = request.args.get('half_simulation_duration')

    # Generate the WAV file
    doppler_effect_wav_generator(
        source_velocity=float(source_velocity),
        source_freq=float(source_freq),
        normal_distance=float(normal_distance),
        half_simulation_duration=float(half_simulation_duration)
    )

    # Construct the absolute path to the generated WAV file
    # The file is saved in Signal-Viewer-and-Classifier/api/doppler/doppler_effect.wav
    wav_file_relative_path_from_repo = os.path.join('api', 'doppler', 'doppler_effect.wav')
    wav_file_absolute_path = os.path.join(repo_path, wav_file_relative_path_from_repo)

    # Return the WAV file
    return send_file(
        wav_file_absolute_path,
        mimetype='audio/wav',
        as_attachment=True,
        download_name='doppler_effect.wav'
    )

if __name__ == '__main__':
    # Using threaded=False to avoid issues with Colab environment
    app.run(port=port, threaded=False)