In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir("/content/drive/My Drive/speech")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!cp -r "/content/drive/My Drive/speech" /content/

In [None]:
# Install required libraries
!pip install numpy scipy soundfile librosa matplotlib open3d piper-tts pyttsx3

# Install piper TTS for Arabic language support
!pip install piper
!pip install piper-tts



Collecting open3d
  Downloading open3d-0.19.0-cp310-cp310-manylinux_2_31_x86_64.whl.metadata (4.3 kB)
Collecting piper-tts
  Downloading piper_tts-1.2.0-py3-none-any.whl.metadata (776 bytes)
Collecting pyttsx3
  Downloading pyttsx3-2.98-py3-none-any.whl.metadata (3.8 kB)
Collecting dash>=2.6.0 (from open3d)
  Downloading dash-2.18.2-py3-none-any.whl.metadata (10 kB)
Collecting configargparse (from open3d)
  Downloading ConfigArgParse-1.7-py3-none-any.whl.metadata (23 kB)
Collecting ipywidgets>=8.0.4 (from open3d)
  Downloading ipywidgets-8.1.5-py3-none-any.whl.metadata (2.3 kB)
Collecting addict (from open3d)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting pyquaternion (from open3d)
  Downloading pyquaternion-0.9.9-py3-none-any.whl.metadata (1.4 kB)
Collecting piper-phonemize~=1.1.0 (from piper-tts)
  Downloading piper_phonemize-1.1.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (282 bytes)
Collecting onnxruntime<2,>=1.11.0 (from piper-tts)
  Downloading onnx

In [None]:
!git clone https://github.com/amini-allight/cipic-hrtf-database.git


In [None]:
!pip install arabic-reshaper
!pip install python-bidi


In [None]:
import arabic_reshaper
from bidi.algorithm import get_display


In [None]:
import matplotlib as mpl
mpl.rcParams.update(mpl.rcParamsDefault)

In [None]:
# Import necessary libraries
try:
    print("Importing libraries...")
    import numpy as np
    import scipy
    import scipy.io as sio
    from scipy.io import wavfile
    import scipy.io
    import scipy.signal as signal
    import soundfile as sf
    import matplotlib.pyplot as plt
    import librosa
    import os
    import subprocess
    import IPython.display as ipd

    # Suppress warnings for clean output
    import warnings
    warnings.filterwarnings('ignore')

    print("Library import successful.")
except Exception as e:
    print(" importing libraries:", e)


Importing libraries...
Library import successful.


In [None]:
def text_to_speech_piper(text, output_filename):
    """
    Convert Arabic text to speech using Piper TTS.
    """
    print(f"[TTS] Converting text to speech for: '{text}'")
    try:
        # Save the text to a temporary file
        with open('temp.txt', 'w', encoding='utf-8') as f:
            f.write(text)
        print(f"[TTS] Saved text to 'temp.txt'")
        with open('temp.txt', 'r', encoding='utf-8') as file:
            textt = file.read()
        # Run Piper TTS command
        !echo {text} | piper --model /content/speech/ar-fareed-medium.onnx --output_file {output_filename}
        print(f"[TTS] Audio saved to '{output_filename}'")
    # except subprocess.CalledProcessError as e:
    #     print(f"[TTS] Error during text-to-speech conversion: {e}")
    #     raise e  # Re-raise exception to be handled by calling function
    except Exception as e:
         print(f"[TTS] Unexpected error: {e}")
         raise e


In [None]:
def cartesian_to_spherical(x, y, z):
    """
    Convert Cartesian coordinates to spherical coordinates.
    Coordinate system: x is forward, y is right, z is up.
    Azimuth: angle from x-axis towards y-axis in x-y plane (-180° to +180°)
    Elevation: angle from x-y plane towards z-axis (-90° to +90°)
    """
    print(f"[Coordinate Conversion] Input Cartesian coordinates: x={x}, y={y}, z={z}")

    try:
        # Calculate distance
        distance = np.sqrt(x**2 + y**2 + z**2)

        # Azimuth calculation from x and y
        azimuth = np.degrees(np.arctan2(y, x))
        # Azimuth is now between -180° and +180°

        # Only include azimuths from -90° to +90° (front hemisphere)
        if not (-90 <= azimuth <= 90):
            print(f"[Coordinate Conversion] Warning: Azimuth {azimuth}° is outside front hemisphere (|azimuth| > 90°). Excluding object.")
            azimuth = None

        # Elevation calculation from z and distance
        elevation = np.degrees(np.arcsin(z / distance))

        print(f"[Coordinate Conversion] Computed spherical coordinates: azimuth={azimuth}°, elevation={elevation}°, distance={distance}")
        return azimuth, elevation, distance
    except Exception as e:
        print(f"[Coordinate Conversion] Exception in conversion: {e}")
        raise e


In [None]:
def load_hrtf_data(subject_id='003'):
    """
    Load HRTF data for the specified subject.
    """
    print(f"[HRTF Data Loading] Loading HRTF data for subject {subject_id}")
    hrtf_path = f'/content/drive/MyDrive/speech/cipic-hrtf-database/standard_hrir_database/subject_{subject_id}/'

    try:
        if not os.path.exists(hrtf_path):
            raise FileNotFound(f"HRTF data path '{hrtf_path}' does not exist.")

        print(f"[HRTF Data Loading] HRTF path: {hrtf_path}")

        # Load HRIR data
        mat_contents = sio.loadmat(hrtf_path + 'hrir_final.mat')
        hrir_l = mat_contents['hrir_l']  # Left ear HRIRs
        hrir_r = mat_contents['hrir_r']  # Right ear HRIRs
        print("[HRTF Data Loading] Loaded HRIR data")

        # Define azimuths and elevations used in the CIPIC database
        azimuths = np.array([
            -80, -65, -55, -45, -40, -35, -30, -25, -20, -15, -10,
            -5, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 55, 65, 80
        ], dtype=float)

        elevations = np.array([-45 + 5.625 * i for i in range(50)], dtype=float)  # From -45 to +180 degrees
        print("[HRTF Data Loading] Defined azimuths and elevations")

        hrtf_data = {
            'hrir_l': hrir_l,
            'hrir_r': hrir_r,
            'azimuths': azimuths,
            'elevations': elevations
        }

        print("[HRTF Data Loading] HRTF data prepared")
        return hrtf_data
    except FileNotFound as e:
        print(f"[HRTF Data Loading]: {e}")
        raise e
    except Exception as e:
        print(f"[HRTF Data Loading] Unexpected: {e}")
        raise e


In [None]:
def apply_hrtf(audio_filename, azimuth, elevation, hrtf_data):
    """
    Apply HRTF to the audio file based on the given azimuth and elevation.
    """
    if azimuth is None:
        print(f"[HRTF] Skipping HRTF application due to invalid azimuth.")
        return None

    print(f"[HRTF] Applying HRTF for azimuth {azimuth}°, elevation {elevation}° to '{audio_filename}'")
    try:
        # Load audio file
        audio_data, fs = sf.read(audio_filename)
        print(f"[HRTF] Loaded audio file '{audio_filename}', Sampling rate: {fs} Hz")

        # If audio is stereo, convert to mono
        if audio_data.ndim > 1:
            audio_data = np.mean(audio_data, axis=1)
            print("[HRTF] Converted stereo audio to mono")

        # Resample if necessary
        fs_desired = 44100  # HRIR data sampling rate
        if fs != fs_desired:
            audio_data = librosa.resample(audio_data, orig_sr=fs, target_sr=fs_desired)
            fs = fs_desired
            print(f"[HRTF] Resampled audio to {fs_desired} Hz")

        # Find nearest HRTF indices
        hrir_l = hrtf_data['hrir_l']
        hrir_r = hrtf_data['hrir_r']
        azimuths = hrtf_data['azimuths']
        elevations = hrtf_data['elevations']

        # Clip azimuth to CIPIC HRTF range (-80°, +80°)
        if azimuth < -80 or azimuth > 80:
            print(f"[HRTF] Warning: Azimuth {azimuth}° out of HRTF range (-80° to +80°). Clipping.")
            azimuth = np.clip(azimuth, -80, 80)

        # Clip elevation to CIPIC HRTF range (-45°, +90°)
        if elevation < -45 or elevation > 90:
            print(f"[HRTF] Warning: Elevation {elevation}° out of HRTF range (-45° to +90°). Clipping.")
            elevation = np.clip(elevation, -45, 90)

        azimuth_idx = np.abs(azimuths - azimuth).argmin()
        elevation_idx = np.abs(elevations - elevation).argmin()

        print(f"[HRTF] Nearest HRTF indices - Azimuth index: {azimuth_idx}, Elevation index: {elevation_idx}")
        print(f"[HRTF] Using HRTF at azimuth {azimuths[azimuth_idx]}°, elevation {elevations[elevation_idx]}°")

        hrir_l_selected = hrir_l[azimuth_idx, elevation_idx, :]
        hrir_r_selected = hrir_r[azimuth_idx, elevation_idx, :]

        # Convolve audio with HRIR
        audio_left = signal.convolve(audio_data, hrir_l_selected, mode='full')
        audio_right = signal.convolve(audio_data, hrir_r_selected, mode='full')
        print("[HRTF] Applied convolution with HRIR filters")

        # Ensure both channels are the same length
        min_len = min(len(audio_left), len(audio_right))
        audio_left = audio_left[:min_len]
        audio_right = audio_right[:min_len]

        # Combine into stereo audio
        audio_stereo = np.vstack((audio_left, audio_right)).T

        # Normalize audio
        max_val = np.max(np.abs(audio_stereo))
        if max_val > 0:
            audio_stereo = audio_stereo / max_val
            print(f"[HRTF] Normalized audio, max value before normalization: {max_val}")

        # Save processed audio
        output_filename = audio_filename.replace('.wav', '_hrtf.wav')
        sf.write(output_filename, audio_stereo, fs)
        print(f"[HRTF] HRTF-applied audio saved as '{output_filename}'")

        return output_filename
    except Exception as e:
        print(f"[HRTF] Exception during HRTF application: {e}")
        raise e


In [None]:
def adjust_volume(audio_filename, d, min_distance=1.0, max_distance=100.0):
    """
    Adjust the volume of the audio based on the provided distance 'd' from the object.
    Uses an inverse distance model for volume attenuation.
    """
    print(f"[Volume Adjustment] Adjusting volume for '{audio_filename}' based on distance: {d} units")
    try:
        # Load audio
        audio_data, fs = sf.read(audio_filename)
        print(f"[Volume Adjustment] Loaded audio file '{audio_filename}', Sampling rate: {fs} Hz")

        # Inverse distance model
        if d < min_distance:
            d = min_distance
        elif d > max_distance:
            d = max_distance

        scaling_factor = min_distance / d
        print(f"[Volume Adjustment] Computed scaling factor: {scaling_factor}")

        # Apply scaling factor
        audio_data = audio_data * scaling_factor
        print(f"[Volume Adjustment] Applied scaling factor to audio data")

        # Save adjusted audio
        output_filename = audio_filename.replace('_hrtf.wav', '_3D.wav')
        sf.write(output_filename, audio_data, fs)
        print(f"[Volume Adjustment] Volume-adjusted audio saved as '{output_filename}'")

        return output_filename
    except Exception as e:
        print(f"[Volume Adjustment] during volume adjustment: {e}")
        raise e


In [None]:
def combine_audio_files(audio_files, output_filename, silence_duration=1.0):
    """
    Combine multiple audio files into one, separated by silence.
    """
    print("[Audio Combination] Starting combination of audio files...")
    try:
        combined_audio = []
        fs = None

        # Generate silence
        silence = np.zeros(int(silence_duration * 44100))
        print(f"[Audio Combination] Generated {silence_duration} seconds of silence")

        for idx, audio_file in enumerate(audio_files):
            print(f"[Audio Combination] Processing file {idx+1}/{len(audio_files)}: '{audio_file}'")
            audio_data, current_fs = sf.read(audio_file)
            print(f"[Audio Combination] Loaded audio file '{audio_file}', Sampling rate: {current_fs} Hz")

            if fs is None:
                fs = current_fs
            elif fs != current_fs:
                audio_data = librosa.resample(audio_data.T, current_fs, fs).T
                print(f"[Audio Combination] Resampled audio to {fs} Hz")

            combined_audio.append(audio_data)

            if idx < len(audio_files) - 1:
                # Add silence between audio clips
                if audio_data.ndim == 1:
                    silence_array = silence
                else:
                    silence_array = np.vstack([silence for _ in range(audio_data.shape[1])]).T
                combined_audio.append(silence_array)
                print(f"[Audio Combination] Added silence after '{audio_file}'")

        # Concatenate all audio data
        combined_audio = np.concatenate(combined_audio, axis=0)
        print("[Audio Combination] Concatenated all audio clips")

        # Save combined audio
        sf.write(output_filename, combined_audio, fs)
        print(f"[Audio Combination] Combined audio saved as '{output_filename}'")
    except Exception as e:
        print(f"[Audio Combination] during audio combination: {e}")
        raise e


In [None]:
def visualize_spatial_audio(objects):
    """
    Visualize the spatial positions of objects in 3D space from the listener's perspective.
    Negative y-values (left) are displayed on the left side of the plot.
    """
    print("[Visualization] Creating comprehensive 3D visualization")
    try:
        import matplotlib.pyplot as plt
        from mpl_toolkits.mplot3d import Axes3D  # Needed for 3D projection
        import matplotlib.patches as mpatches
        import arabic_reshaper
        from bidi.algorithm import get_display

        # Prepare data lists
        azimuths = []
        elevations = []
        distances = []
        labels = []
        x_coords = []
        y_coords = []
        z_coords = []

        for obj in objects:
            x, y, z = obj['x'], obj['y'], obj['z']
            d = obj['d']
            class_text = obj['class_text']

            # Convert to spherical coordinates
            azimuth, elevation, distance = cartesian_to_spherical(x, y, z)

            if azimuth is not None:
                azimuths.append(azimuth)
                elevations.append(elevation)
                distances.append(d)  # Use 'd' for distance in visualization
                labels.append(class_text)
                x_coords.append(x)
                y_coords.append(y)
                z_coords.append(z)

        if not azimuths:
            print("[Visualization] No objects to display.")
            return

        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111, projection='3d')

        # Plot listener at the origin (0, 0, 0)
        ax.scatter(0, 0, 0, c='red', s=100, marker='^', label='Listener')

        # Plot objects
        sc = ax.scatter(x_coords, y_coords, z_coords, c=distances, cmap='viridis', s=100, marker='o', label='Objects')

        # Annotate objects with labels
        for i in range(len(labels)):
            reshaped_text = arabic_reshaper.reshape(labels[i])
            bidi_text = get_display(reshaped_text)
            ax.text(
                x_coords[i], y_coords[i], z_coords[i],
                bidi_text,
                fontsize=10, ha='center', va='bottom'
            )

        # Set labels and title
        ax.set_xlabel('X (Forward)', fontsize=12)
        ax.set_ylabel('Y (Right ⟷ Left)', fontsize=12)
        ax.set_zlabel('Z (Up)', fontsize=12)
        ax.set_title('3D Spatial Visualization of Objects Relative to Listener', fontsize=14)

        y_min, y_max = ax.get_ylim()
        ax.set_ylim(y_max, y_min)

        # Set equal aspect ratio
        max_range = np.array([x_coords, y_coords, z_coords]).ptp().max() / 2.0
        mid_x = (max(x_coords) + min(x_coords)) * 0.5
        mid_y = (max(y_coords) + min(y_coords)) * 0.5
        mid_z = (max(z_coords) + min(z_coords)) * 0.5
        ax.set_xlim(mid_x - max_range, mid_x + max_range)
        ax.set_ylim(mid_y + max_range, mid_y - max_range)
        ax.set_zlim(mid_z - max_range, mid_z + max_range)

        cbar = plt.colorbar(sc, ax=ax, shrink=0.5, aspect=10)
        cbar.set_label('Distance (d)', fontsize=12)

        ax.quiver(
            0, 0, 0,
            1, 0, 0,
            length=max_range * 0.5, color='red', arrow_length_ratio=0.2, normalize=True, label='Facing Direction'
        )

        red_patch = mpatches.Patch(color='red', label='Listener')
        ax.legend(handles=[red_patch], loc='upper left')

        # Grid and background
        ax.grid(True)
        ax.set_facecolor('white')

        # View adjustments
        ax.view_init(elev=20, azim=-60)

        plt.tight_layout()
        plt.show()
    except Exception as e:
        print(f"[Visualization] Exception during plotting: {e}")
        raise e


In [None]:
def main_pipeline(objects):
    try:
        visualize_spatial_audio(objects)

        # Load HRTF data
        hrtf_data = load_hrtf_data()
    except Exception as e:
        print("[Main Pipeline] Failed to initialize.")
        return

    processed_audio_files = []

    for idx, obj in enumerate(objects):
        print(f"\n[Main Pipeline] Processing object {idx+1}/{len(objects)}: '{obj['class_text']}'")
        try:
            # Step 2: Text-to-Speech
            class_text = obj['class_text']
            tts_output = f"object_{idx}_tts.wav"
            text_to_speech_piper(class_text, tts_output)

            # Step 3: Cartesian to Spherical
            x, y, z = obj['x'], obj['y'], obj['z']
            azimuth, elevation, _ = cartesian_to_spherical(x, y, z)

            if azimuth is None:
                print(f"[Main Pipeline] Skipping object '{class_text}' as it is behind the listener.")
                continue

            # Step 4: Apply HRTF
            hrtf_output = apply_hrtf(tts_output, azimuth, elevation, hrtf_data)

            if hrtf_output is None:
                continue  # Skip if HRTF application failed

            # Step 5: Volume Mapping using 'd' from object
            d = obj['d']
            volume_adjusted_output = adjust_volume(hrtf_output, d, min_distance=1.0, max_distance=100.0)

            processed_audio_files.append(volume_adjusted_output)
        except Exception as e:
            print(f"[Main Pipeline] Exception processing object '{class_text}': {e}")
            continue

    if processed_audio_files:
        try:
            # Step 6: Combine Audio Files
            combine_audio_files(processed_audio_files, output_filename='final_output.wav')
            # Play the final audio
            print("[Main Pipeline] Final audio playback:")
            ipd.display(ipd.Audio('final_output.wav'))
        except Exception as e:
            print(f"[Main Pipeline] Exception during audio combination: {e}")
    else:
        print("[Main Pipeline] No audio files were processed. Exiting.")


In [None]:
objects = [
    {'class_text': 'سَيَّارَة', 'x': 10.0, 'y': 0.0, 'z': 0.0, 'd': 10.0},
    {'class_text': 'حُفْرَةٌ', 'x': 0.0, 'y': 5.0, 'z': 0.0, 'd': 30.0},
    {'class_text': 'شَجَرَة', 'x': 10.0, 'y': 5.0, 'z': 0.0, 'd': 30.0},
    {'class_text': 'بِنَايَةٌ', 'x': 10.0, 'y': -5.0, 'z': 0.0, 'd':11.18},
    {'class_text': 'طَائِرَةٌ', 'x': 10.0, 'y': 0.0, 'z': 15.0, 'd': 20.0},
]

In [None]:
main_pipeline(objects)
