# CV1 - Time to Speech

Jednoduchy program pro prevod casu (24hodinovy system) na hlasovou notifikaci.
opus format byl zvolen kvuli jeho kvalite a nizke velikosti souboru. Opus je ztratova komprese, ktera je casto vyuzivana u audio streamovani.


In [None]:
import os
import time

import numpy as np
import simpleaudio as sa
from datetime import datetime
import av
from pathlib import Path
from pydub.silence import detect_nonsilent
from io import BytesIO
import plotly.graph_objects as go
from pydub import AudioSegment
from pydub.effects import high_pass_filter, normalize, compress_dynamic_range
import librosa
import librosa.effects
from datasets import load_dataset
import shutil
from huggingface_hub import snapshot_download

In [None]:
# Path to the folder with recordings
RECORDINGS_DIR = "./recordings"
RECORDINGS_DIR = Path(RECORDINGS_DIR)
dataset_name = "night12/czech_time_shards_recordings"

In [None]:
def download_recordings():
    os.makedirs(RECORDINGS_DIR, exist_ok=True)

    # Download all files from the dataset repository
    downloaded_path = snapshot_download(dataset_name, local_dir=RECORDINGS_DIR, repo_type="dataset")

    print(f"Dataset downloaded to: {downloaded_path}")

In [None]:
download_recordings()

In [None]:
def plot_waveform(audio_segment, title="Waveform"):
    """ Visualize the waveform using Plotly """
    samples = np.array(audio_segment.get_array_of_samples(), dtype=np.int16)
    time_axis = np.linspace(0, len(samples) / audio_segment.frame_rate, num=len(samples))

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=time_axis, y=samples, mode="lines", name="Waveform"))
    fig.update_layout(title=title, xaxis_title="Time (seconds)", yaxis_title="Amplitude")
    fig.show()


In [None]:
def play_opus(file_path):
    try:
        # Open Opus file with PyAV (FFmpeg backend)
        container = av.open(file_path)
        stream = next(s for s in container.streams if s.type == 'audio')

        # Decode audio frames
        audio_frames = []
        sample_rate = stream.sample_rate
        num_channels = stream.channels or 1  # Default to mono if not specified

        for frame in container.decode(stream):
            frame_data = frame.to_ndarray()

            # Convert stereo to mono if necessary
            if frame_data.ndim > 1:
                frame_data = np.mean(frame_data, axis=0)  # Convert to mono by averaging channels

            # Convert float32 data to int16 PCM (sound card expects this)
            if frame_data.dtype == np.float32:
                frame_data = (frame_data * 32767).astype(np.int16)

            audio_frames.append(frame_data)

        # Concatenate all frames into a single NumPy array
        if len(audio_frames) == 0:
            print(f"❌ Error: No valid audio frames in {file_path}")
            return

        audio_data = np.concatenate(audio_frames).astype(np.int16)

        # Play audio using simpleaudio
        play_obj = sa.play_buffer(audio_data, num_channels=num_channels, bytes_per_sample=2, sample_rate=sample_rate)
        play_obj.wait_done()  # Wait for playback to finish

    except Exception as e:
        print(f"❌ Error playing {file_path}: {e}")

In [None]:
play_opus("recordings/minut.opus")

In [None]:
# POUHE PREHRAVANI AUDIO SOUBORU V SEKVENCI
play_opus("recordings/je_prave.opus")
play_opus("recordings/20.opus")
play_opus("recordings/2.opus")
play_opus("recordings/hodin.opus")
play_opus("recordings/30.opus")
play_opus("recordings/4.opus")
play_opus("recordings/minut.opus")

In [None]:
def load_opus_with_pyav(file_path):
    """ Load Opus file and return raw PCM data as AudioSegment """
    try:
        container = av.open(file_path)
        stream = next(s for s in container.streams if s.type == 'audio')

        audio_frames = []
        sample_rate = stream.sample_rate
        num_channels = stream.channels or 1  # Default to mono if not specified

        for frame in container.decode(stream):
            frame_data = frame.to_ndarray()

            # Debug: Print frame shape and dtype
            #print(f"Decoded frame shape: {frame_data.shape}, dtype: {frame_data.dtype}")

            # Convert stereo to mono if necessary
            if frame_data.ndim > 1:
                frame_data = np.mean(frame_data, axis=0)  # Convert to mono by averaging channels

            # Convert float32 data to int16 PCM (sound card expects this)
            if frame_data.dtype == np.float32:
                frame_data = (frame_data * 32767).astype(np.int16)

            audio_frames.append(frame_data)

        if not audio_frames:
            raise ValueError(f"❌ No valid audio frames in {file_path}")

        # Concatenate all frames into a single NumPy array
        audio_data = np.concatenate(audio_frames).astype(np.int16)

        # Debug: Print sample statistics
        #print(f"Audio data stats - min: {audio_data.min()}, max: {audio_data.max()}, mean: {audio_data.mean()}")

        # Convert NumPy array to raw PCM byte stream
        pcm_bytes = audio_data.tobytes()

        # Wrap PCM bytes in BytesIO and create a pydub AudioSegment
        audio_segment = AudioSegment(
            data=pcm_bytes,
            sample_width=2,  # 16-bit PCM
            frame_rate=sample_rate,
            channels=num_channels
        )

        return audio_segment

    except Exception as e:
        raise RuntimeError(f"❌ Error decoding {file_path}: {e}")


In [None]:
# Function to flatten the nested list
def flatten_list(nested_list):
    flat_list = []
    for item in nested_list:
        if isinstance(item, list):
            flat_list.extend(flatten_list(item))
        else:
            flat_list.append(item)
    return flat_list

In [None]:
def pick_correct_recordings_by_time(time_input, recordings_folder):
    # split time into hours and minutes, forget seconds
    hours = int(time_input.hour)
    minutes = int(time_input.minute)

    announcer = recordings_folder / "je_prave.opus"
    hour_desc_file = None
    minute_desc_file = None
    hour_val = []
    minute_val = []

    if hours in (2,3,4):
        announcer = recordings_folder / "jsou_prave.opus"

    if hours == 1:
        hour_desc_file = recordings_folder / f"hodina.opus"
    elif 5 > hours > 1:
        hour_desc_file = recordings_folder / f"hodiny.opus"
    else:
        hour_desc_file = recordings_folder / f"hodin.opus"

    if hours == 2:
        hour_val.append(recordings_folder / f"dve.opus")
    elif hours < 20:
        hour_val.append(recordings_folder / f"{hours}.opus")
    else:
        hour_val.append(recordings_folder / f"{hours - hours % 10}.opus")
        if hours % 10 != 0:
            hour_val.append(recordings_folder / f"{hours % 10}.opus")

    if minutes == 1:
        minute_desc_file = recordings_folder / f"minuta.opus"
    elif minutes == 0:
        minute_desc_file = recordings_folder / f"silence.opus"
    elif 5 > minutes > 1:
        minute_desc_file = recordings_folder / f"minuty.opus"
    else:
        minute_desc_file = recordings_folder / f"minut.opus"

    if minutes == 0:
        minute_val = []
    elif minutes == 2:
        minute_val.append(recordings_folder / f"dve.opus")
    elif minutes < 20:
        minute_val.append(recordings_folder / f"{minutes}.opus")
    else:
        if minutes % 10 == 0:
            minute_val.append(recordings_folder / f"{minutes}.opus")
        else:
            minute_val.append(recordings_folder / f"{minutes - minutes % 10}.opus")
            if minutes % 10 != 0:
                minute_val.append(recordings_folder / f"{minutes % 10}.opus")


    #print(hours, minutes)
    #print(hour_val, hour_desc_file)
    #print(minute_val, minute_desc_file)

    return announcer, hour_val, hour_desc_file, minute_val, minute_desc_file
    # returns a list of paths to the recordings

In [None]:
pick_correct_recordings_by_time(datetime.now(), RECORDINGS_DIR)

In [None]:
def play_audio(audio_segment):
    """ Play an AudioSegment object directly from RAM """
    samples = np.array(audio_segment.get_array_of_samples(), dtype=np.int16)

    # Normalize PCM data if stereo
    if audio_segment.channels == 2:
        samples = samples.reshape((-1, 2))

    # Debug: Check if samples contain valid audio
    #print(f"Playback samples min: {samples.min()}, max: {samples.max()}")

    # Play using simpleaudio
    sa.play_buffer(samples, num_channels=audio_segment.channels, bytes_per_sample=2, sample_rate=audio_segment.frame_rate).wait_done()

In [None]:
audio = load_opus_with_pyav("recordings/je_prave.opus")
play_audio(audio)


In [None]:
def assemble_speech_from_time(time_input, recordings_folder):
    time_recording_files = pick_correct_recordings_by_time(time_input, recordings_folder)
    time_recording_files = flatten_list(time_recording_files)
    print(time_recording_files)
    # now we need to read the binary data from the files and merge them into one sequence while trimming the silence spaces on the begging and end for each file

    for file in time_recording_files:
        audio = load_opus_with_pyav(file)
        plot_waveform(audio, title=f"Waveform of {file}")
        play_audio(audio)




In [None]:
# vykresleni nahravek bez uprav
assemble_speech_from_time(datetime.now(), RECORDINGS_DIR)

In [None]:
# List of test times
test_times = [
    datetime(2024, 1, 1, 0, 0),  # 0:00
    datetime(2024, 1, 1, 2, 0),  # 2:00
    datetime(2024, 1, 1, 2, 2),  # 2:02
    datetime(2024, 1, 1, 3, 10), # 3:10
    datetime(2024, 1, 1, 4, 15), # 4:15
    datetime(2024, 1, 1, 20, 19), # 20:19
    datetime(2024, 1, 1, 20, 20) # 20:20
]

# Run the function with different datetime values
for test_time in test_times:
    print(f"\nTesting assemble_speech_from_time with time: {test_time.strftime('%H:%M')}")
    output_audio = assemble_speech_from_time(test_time, RECORDINGS_DIR)


# Cast s rozsirenym resenim, obsahuje orez ticha, normalizaci hlasitosti, zmenu pitchu a tempa, zmenu hlasitosti

In [None]:
def boost_audio(audio_segment, factor=3):
    """ Boost the audio volume by a given factor. """

    # Convert factor to decibels (logarithmic scale)
    gain_db = 20 * np.log10(factor)

    # Apply volume gain
    boosted_audio = audio_segment.apply_gain(gain_db)

    return boosted_audio


In [None]:
def trim_silence(audio_segment, silence_thresh=-1, min_silence_len=50, keep_padding=50):
    """ Trim leading and trailing silence where volume is lower than `silence_thresh`. """

    # Detect nonsilent regions (start and end of actual sound)
    nonsilent_ranges = detect_nonsilent(audio_segment, min_silence_len=min_silence_len, silence_thresh=silence_thresh)

    if not nonsilent_ranges:
        print("⚠️ No speech detected, returning empty audio.")
        return AudioSegment.silent(duration=500)  # Return short silence to avoid errors

    # Get start and end of non-silent part
    start_trim = max(0, nonsilent_ranges[0][0] - keep_padding)  # Add buffer before start
    end_trim = min(len(audio_segment), nonsilent_ranges[-1][1] + keep_padding)  # Add buffer after end

    # Trim the audio
    trimmed_audio = audio_segment[start_trim:end_trim]

    return trimmed_audio


In [None]:
def normalize_audio(audio_segment, target_dBFS=-20.0):
    """
    Normalize the entire audio to a target loudness level.

    Args:
        audio_segment (AudioSegment): The input audio.
        target_dBFS (float): Target loudness level in decibels (default: -20dBFS).

    Returns:
        AudioSegment: Normalized audio.
    """

    # Calculate current loudness
    current_dBFS = audio_segment.dBFS
    gain = target_dBFS - current_dBFS

    # Apply gain to normalize
    normalized_audio = audio_segment.apply_gain(gain)

    return normalized_audio

In [None]:
def adjust_pitch(audio_segment, semitones=2):
    """
    Adjust the pitch of an audio file without changing tempo.

    Args:
        audio_segment (AudioSegment): The input audio.
        semitones (int): The number of semitones to shift.
                         Positive = higher pitch, Negative = lower pitch.

    Returns:
        AudioSegment: The pitch-adjusted audio.
    """

    # Convert AudioSegment to NumPy array
    samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32)
    sample_rate = audio_segment.frame_rate

    # Ensure mono audio (librosa only processes mono)
    if audio_segment.channels > 1:
        samples = samples.reshape((-1, audio_segment.channels)).mean(axis=1)

    # Apply pitch shifting
    pitch_shifted_samples = librosa.effects.pitch_shift(samples, sr=sample_rate, n_steps=semitones)

    # Convert back to AudioSegment
    processed_audio = AudioSegment(
        pitch_shifted_samples.astype(np.int16).tobytes(),
        frame_rate=sample_rate,
        sample_width=2,
        channels=1
    )

    return processed_audio


## nasledujici metody byly pouzity pro pokus o odstraneni plosivnich zvuku
nizka uspesnost, explosivni t je stale slyset

In [None]:
# meni i barvu zvuku, ale plosivni zvuky stale slyset temer stejne
def remove_plosives(audio_segment, highpass_freq=22000, target_freq=5500, width=800, compression_threshold=-25):
    """
    Reduce harsh plosive sounds using:
    1. A High-Pass Filter to remove low-end pops.
    2. A Notch Filter (FFT) to target the "T" frequency (5kHz-6kHz).
    3. Hard Limiting to suppress explosive transients.

    Args:
        audio_segment (AudioSegment): The input audio.
        highpass_freq (int): High-Pass filter cutoff (default: 2000 Hz).
        target_freq (int): Frequency to notch out (default: 5500 Hz).
        width (int): Range of frequencies to reduce (default: 800 Hz).
        compression_threshold (int): Compression threshold in dB.

    Returns:
        AudioSegment: Processed audio with reduced plosives.
    """

    # Step 1: Apply High-Pass Filter to Remove Low-End Plosives
    filtered_audio = high_pass_filter(audio_segment, highpass_freq)

    # Step 2: Apply a De-Essing Notch Filter at 5.5kHz-6.3kHz (Reduces "T" Harshness)
    def notch_filter(audio, freq, width):
        samples = np.array(audio.get_array_of_samples(), dtype=np.float32)
        fft = np.fft.rfft(samples)
        freqs = np.fft.rfftfreq(len(samples), d=1/audio.frame_rate)

        # Find & Reduce the Target Frequency Band
        mask = (freqs > freq - width / 2) & (freqs < freq + width / 2)
        fft[mask] *= 0.5  # Reduce "T" frequencies by 50%

        # Inverse FFT to Get the Processed Audio Back
        filtered_samples = np.fft.irfft(fft).astype(np.int16)
        return AudioSegment(filtered_samples.tobytes(), frame_rate=audio.frame_rate, sample_width=2, channels=1)

    reduced_t_audio = notch_filter(filtered_audio, target_freq, width)

    # Step 3: Apply Hard Limiting to Catch Sudden Plosives
    compressed_audio = compress_dynamic_range(reduced_t_audio, threshold=compression_threshold)

    return compressed_audio


In [None]:
# metoda se spatnymi vysledky pri redukci plosivnich zvuku
def detect_and_reduce_plosive(audio_segment, spike_threshold=1000, reduction_factor=0.5):
    """
    Detects and reduces plosive spikes in an audio file by:
    1. Identifying sudden amplitude spikes.
    2. Attenuating the plosive while keeping speech clarity.

    Args:
        audio_segment (AudioSegment): The input audio.
        spike_threshold (int): The amplitude level to consider a spike.
        reduction_factor (float): How much to reduce the spike (0.5 = 50% volume reduction).

    Returns:
        AudioSegment: The processed audio with plosives reduced.
    """

    samples = np.array(audio_segment.get_array_of_samples(), dtype=np.int16)

    # Find plosive peaks
    spikes = np.where(np.abs(samples) > spike_threshold)[0]

    if len(spikes) > 0:
        print(f"⚠️ Found {len(spikes)} plosive spikes, reducing...")

        # Reduce the amplitude of spikes
        for idx in spikes:
            samples[idx] = int(samples[idx] * reduction_factor)  # Reduce spike volume

    # Convert back to AudioSegment
    processed_audio = AudioSegment(
        samples.tobytes(),
        frame_rate=audio_segment.frame_rate,
        sample_width=2,
        channels=1
    )

    return processed_audio

In [None]:
# o neco lepsi metoda
def adaptive_plosive_reduction(audio_segment, spike_threshold=22000, attack_window=10, release_window=30, reduction_factor=0.6):
    """
    Reduce plosives without affecting normal speech:
    1. Detects rapid amplitude bursts.
    2. Selectively reduces only plosive parts.
    3. Uses an adaptive attack & release to avoid muffling normal sounds.

    Args:
        audio_segment (AudioSegment): The input audio.
        spike_threshold (int): The level above which a plosive is detected.
        attack_window (int): How quickly suppression starts (samples).
        release_window (int): How smoothly suppression fades out (samples).
        reduction_factor (float): Strength of suppression (0.6 = 40% volume reduction).

    Returns:
        AudioSegment: Processed audio with reduced plosives.
    """

    samples = np.array(audio_segment.get_array_of_samples(), dtype=np.int16)
    sample_rate = audio_segment.frame_rate

    # Find rapid spikes (potential plosives)
    spike_indices = np.where(np.abs(samples) > spike_threshold)[0]

    if len(spike_indices) > 0:
        print(f"⚠️ Detected {len(spike_indices)} plosive spikes, selectively reducing...")

        # Process each detected plosive
        for idx in spike_indices:
            start = max(0, idx - attack_window)
            end = min(len(samples), idx + release_window)

            # Apply a gradual reduction instead of a hard cut
            fade_curve = np.linspace(1, reduction_factor, num=end-start)
            samples[start:end] = (samples[start:end] * fade_curve).astype(np.int16)

    # Convert back to AudioSegment
    processed_audio = AudioSegment(
        samples.tobytes(),
        frame_rate=sample_rate,
        sample_width=2,
        channels=1
    )

    return processed_audio

In [None]:
def enhanced_assemble_speech_from_time(time_input, recordings_folder):
    time_recording_files = pick_correct_recordings_by_time(time_input, recordings_folder)
    time_recording_files = flatten_list(time_recording_files)
    print(time_recording_files)


    for file in time_recording_files:
        audio = load_opus_with_pyav(file)
        audio = trim_silence(audio, silence_thresh=-60, min_silence_len=10, keep_padding=50)
        audio = normalize_audio(audio)
        audio = adaptive_plosive_reduction(audio) # neni 100 % uspesne; pouzita nejnormalnejsi metoda
        audio = boost_audio(audio, factor=5)
        # audio = adjust_pitch(audio, semitones=5) # zvyseni pitchu o 5 polotonu; divny zvuk s plno explozema
        play_audio(audio)




In [None]:
enhanced_assemble_speech_from_time(datetime.now(), RECORDINGS_DIR)

In [None]:
# debugging exlosivnich T
time_recording_files = pick_correct_recordings_by_time(datetime.now(), RECORDINGS_DIR)

#audio = load_opus_with_pyav(time_recording_files[-1])
audio = load_opus_with_pyav("recordings/minut.opus")
audio = trim_silence(audio, silence_thresh=-60, min_silence_len=10, keep_padding=50)
plot_waveform(audio)

audio = normalize_audio(audio)
plot_waveform(audio)

audio = adaptive_plosive_reduction(audio, spike_threshold=22000, reduction_factor=0.5)
plot_waveform(audio)
audio = boost_audio(audio, factor=1)

plot_waveform(audio)
play_audio(audio)