Simple srt file to wav

Setting

In [1]:
from dotenv import load_dotenv
import os
load_dotenv()

# Set Google Cloud credentials (ensure the path is correct)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "D:\\HIẾU\\plenary-agility-457810-t9-2bc80a805c4a.json"


input_folder = r"C:\Users\Hieu Pham\Downloads\1"  # input folder containing .srt files (batch processing).
                                                   # Make sure only the required .srt files are inside this folder.
input_language_srt = 'vi'  # language of the .srt file

output_folder = input_folder
output_voice = 'vi'  # note: distinguish between gtts and Google Cloud; gtts uses 'vi' while Google Cloud uses 'vi-VN'

volume = 15
speed = 1.5  # adjust individually; speaking rate depends on the language
max_speed_limit = 2  # used to cap speed in case a sentence is too long for context

max_duration_seconds = 50000  # milliseconds; maximum output file duration — files longer than this will be split.
                              # Used because free gtts has limits.

start_index = 0  # default is 0. Use when the video is too long and gtts splits files according to max_duration_seconds.
                  # This index is where subtitles should start after an undesired cut. 
                  # Do not rely on the index printed to the screen, it not true. Carefully align the .srt file with the audio.

model = 'gtts'  # 'gtts' or 'google-cloud'
voice_google_cloud = 'vi-VN-Neural2-A'  # Google Cloud only

MAX_WORKERS = 20  # number of threads

text to speech

In [2]:
import os
import re
import subprocess
import io
from pydub import AudioSegment
from google.cloud import texttospeech
from gtts import gTTS
from dotenv import load_dotenv # Ensure you have python-dotenv installed: pip install python-dotenv
import traceback # For detailed error reporting
from concurrent.futures import ThreadPoolExecutor, as_completed

# --- Core Functions (speed_up_audio_with_ffmpeg, parse_srt, srt_time_to_milliseconds are unchanged) ---
def speed_up_audio_with_ffmpeg(input_audio_path, output_audio_path, speed_factor):
    """Uses FFmpeg to change the speed of audio without altering pitch."""
    clamped_speed_factor = max(0.5, min(float(speed_factor), 100.0))
    if abs(clamped_speed_factor - float(speed_factor)) > 0.01:
        # print(f"Warning: Requested speed {speed_factor} for FFmpeg was clamped to {clamped_speed_factor}.")
        pass

    command = [
        "ffmpeg", "-y", "-i", input_audio_path,
        "-filter:a", f"atempo={clamped_speed_factor}",
        "-vn", output_audio_path
    ]
    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
    except subprocess.CalledProcessError as e:
        print(f"FFmpeg error ({input_audio_path}@{clamped_speed_factor}x): {e.stderr.decode()}")
        raise

def parse_srt(srt_path):
    with open(srt_path, 'r', encoding='utf-8') as f:
        srt_text = f.read()
    sections = re.split(r'\n\s*\n', srt_text.strip())
    parsed = []
    for section in sections:
        lines = section.strip().splitlines()
        if len(lines) >= 3:
            try:
                index = int(lines[0])
                times = lines[1]
                start_time, end_time = times.split(' --> ')
                text = ' '.join(lines[2:]).strip()
                parsed.append((index, start_time, end_time, text))
            except ValueError as e:
                print(f"Skipping malformed SRT section: {section}. Error: {e}")
    return parsed

def srt_time_to_milliseconds(srt_time):
    h, m, s_ms = srt_time.split(':')
    s, ms = s_ms.split(',')
    return (int(h) * 3600 + int(m) * 60 + int(s)) * 1000 + int(ms)

# generate_audio_segment_with_google_tts_chunked remains the same as in the previous corrected version
# It's responsible for a single TTS call.
def generate_audio_segment_with_google_tts_chunked(text, option="gtts", language='vi',
                                                 speed_factor_for_google_api=1.0,
                                                 speed_factor_for_openai_api=1.0,
                                                 volume_db=0, temp_file_id="temp",
                                                 google_voice_name=None):
    base_temp_mp3 = f"temp_{temp_file_id}.mp3"
    if not text.strip(): return AudioSegment.silent(duration=0)

    if option == "google_cloud":
        client = texttospeech.TextToSpeechClient()
        synthesis_input = texttospeech.SynthesisInput(text=text)
        voice = texttospeech.VoiceSelectionParams(language_code=language, google_voice_name='en-US-Neural2-H', ssml_gender=texttospeech.SsmlVoiceGender.FEMALE)
        audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3, speaking_rate=speed_factor_for_google_api, volume_gain_db=volume_db)
        try:
            response = client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config)
            with open(base_temp_mp3, 'wb') as out: out.write(response.audio_content)
        except Exception as e:
            print(f"Google Cloud TTS error for '{text[:30]}...': {e}"); return AudioSegment.silent(duration=0)
        
    elif option == "gtts":
        try:
            tts = gTTS(text=text, lang=language, slow=False); tts.save(base_temp_mp3)
        except Exception as e:
            print(f"gTTS error for '{text[:30]}...': {e}"); return AudioSegment.silent(duration=0)
    elif option == "openai":
        try:
            client = OpenAI()
            clamped_openai_speed = max(0.25, min(float(speed_factor_for_openai_api), 4.0))
            with client.audio.speech.with_streaming_response.create(model='tts-1', voice='alloy', input=text, speed=clamped_openai_speed) as response:
                response.stream_to_file(base_temp_mp3)
        except Exception as e:
            print(f"OpenAI TTS error for '{text[:30]}...': {e}"); return AudioSegment.silent(duration=0)
    else: raise ValueError("TTS Option must be 'google_cloud', 'gtts', or 'openai'.")

    segment = AudioSegment.silent(duration=0)
    try:
        if os.path.exists(base_temp_mp3) and os.path.getsize(base_temp_mp3) > 0:
            segment = AudioSegment.from_mp3(base_temp_mp3)
    except Exception as e: print(f"Error loading MP3 '{base_temp_mp3}' for '{text[:30]}...': {e}")
    finally:
        if os.path.exists(base_temp_mp3): os.remove(base_temp_mp3)
    return segment

# --- New Worker Function for Full Subtitle Processing ---
# MÃ MỚI (Thêm google_voice_name)
def process_subtitle_to_final_audio(
    text, srt_idx, tts_model, language,
    initial_speed_factor, # User's preferred initial speed
    target_srt_duration_ms,
    volume_db,
    speed_limit, # Overall speed limit for FFmpeg, or API cap for API-based speed
    temp_file_id_prefix, # Base for unique temp file names within this task
    google_voice_name=None # <-- THÊM THAM SỐ NÀY
    ):
    """
    Handles the entire lifecycle for one subtitle: TTS, speed adjustment, padding/truncating.
    Returns the final AudioSegment for this subtitle.
    """
    # print(f"  Worker processing SRT Index {srt_idx} ('{text[:20]}...') Target dur: {target_srt_duration_ms}ms")
    # Step 1: Initial TTS Generation
    gen_args_initial = {
        "text": text, "option": tts_model, "language": language,
        "temp_file_id": f"{temp_file_id_prefix}_initial"
    }
    if tts_model == "google_cloud":
        gen_args_initial["speed_factor_for_google_api"] = initial_speed_factor
        gen_args_initial["volume_db"] = volume_db # Google API handles volume
        gen_args_initial["google_voice_name"] = google_voice_name # <-- THÊM DÒNG NÀY
    elif tts_model == "openai":
        gen_args_initial["speed_factor_for_openai_api"] = initial_speed_factor
    # For gTTS, generate_audio_segment_with_google_tts_chunked produces 1.0x speed audio.

    base_generated_segment = generate_audio_segment_with_google_tts_chunked(**gen_args_initial)
    actual_raw_duration_ms = len(base_generated_segment)

    # Adjust target_srt_duration_ms if SRT had 0ms but TTS produced audio
    if target_srt_duration_ms <= 0 and actual_raw_duration_ms > 0:
        target_srt_duration_ms = actual_raw_duration_ms
    elif target_srt_duration_ms <= 0 and actual_raw_duration_ms <= 0: # Both 0 or invalid
        return AudioSegment.silent(duration=0)

    final_segment_for_processing = AudioSegment.silent(duration=0) # Initialize

    # Step 2: Speed Adjustment (if needed)
    if actual_raw_duration_ms <= 0: # TTS failed or produced empty audio
        if target_srt_duration_ms > 0:
            final_segment_for_processing = AudioSegment.silent(duration=target_srt_duration_ms)
    elif tts_model == "google_cloud" or tts_model == "openai":
        current_segment_from_api = base_generated_segment
        api_speed_used_for_current_segment = initial_speed_factor

        if actual_raw_duration_ms > target_srt_duration_ms and target_srt_duration_ms > 0:
            required_api_speed = (actual_raw_duration_ms / target_srt_duration_ms) * api_speed_used_for_current_segment
            api_speed_cap = 2.0
            api_speed_floor = 0.25
            final_api_speed_for_regen = max(api_speed_floor, min(required_api_speed, api_speed_cap))

            # print(f"    SRT {srt_idx} ({tts_model}): Initial dur {actual_raw_duration_ms}ms > target {target_srt_duration_ms}ms. Regen at {final_api_speed_for_regen:.2f}x.")
            regen_args = {
                "text": text, "option": tts_model, "language": language,
                "temp_file_id": f"{temp_file_id_prefix}_regen"
            }
            if tts_model == "google_cloud":
                regen_args["speed_factor_for_google_api"] = final_api_speed_for_regen
                regen_args["volume_db"] = volume_db
                gen_args_initial["google_voice_name"] = google_voice_name # <-- THÊM DÒNG NÀY

            elif tts_model == "openai":
                regen_args["speed_factor_for_openai_api"] = final_api_speed_for_regen
            
            final_segment_for_processing = generate_audio_segment_with_google_tts_chunked(**regen_args)
        else: # Audio from API is short enough or fits
            final_segment_for_processing = current_segment_from_api
        
        # Truncate if still too long (e.g. API speed cap reached) or if it was just right
        final_segment_for_processing = final_segment_for_processing[:target_srt_duration_ms]

    elif tts_model == "gtts":
        # base_generated_segment is at 1.0x speed. FFmpeg handles speed and respects speed_limit.
        if target_srt_duration_ms <= 0: # Should have been caught, but defensive
            final_segment_for_processing = base_generated_segment # Use raw 1.0x
        else:
            ffmpeg_speed_to_apply = 1.0
            # If audio at initial_speed_factor fits or is shorter
            if (actual_raw_duration_ms / initial_speed_factor) <= target_srt_duration_ms:
                ffmpeg_speed_to_apply = initial_speed_factor
            else: # initial_speed_factor makes it too long. Calculate speed to fit.
                ffmpeg_speed_to_apply = actual_raw_duration_ms / target_srt_duration_ms
            
            # Cap by speed_limit and ensure practical minimum for atempo
            ffmpeg_speed_to_apply = max(0.5, min(ffmpeg_speed_to_apply, speed_limit))

            if abs(ffmpeg_speed_to_apply - 1.0) < 0.01: # If effectively 1.0x
                final_segment_for_processing = base_generated_segment
            else:
                temp_raw_path = f"temp_{temp_file_id_prefix}_gtts_raw.mp3"
                temp_sped_path = f"temp_{temp_file_id_prefix}_gtts_sped.mp3"
                try:
                    base_generated_segment.export(temp_raw_path, format="mp3")
                    # print(f"    SRT {srt_idx} (gTTS): FFmpeg @ {ffmpeg_speed_to_apply:.2f}x from raw {actual_raw_duration_ms}ms to fit {target_srt_duration_ms}ms.")
                    speed_up_audio_with_ffmpeg(temp_raw_path, temp_sped_path, ffmpeg_speed_to_apply)
                    final_segment_for_processing = AudioSegment.from_mp3(temp_sped_path)
                except Exception as e_ffmpeg:
                    print(f"    Error in threaded gTTS FFmpeg for srt_idx {srt_idx}: {e_ffmpeg}. Using 1.0x audio.")
                    final_segment_for_processing = base_generated_segment # Fallback
                finally:
                    if os.path.exists(temp_raw_path): os.remove(temp_raw_path)
                    if os.path.exists(temp_sped_path): os.remove(temp_sped_path)
        
        # Truncate if FFmpeg (even at speed_limit) resulted in audio longer than target
        final_segment_for_processing = final_segment_for_processing[:target_srt_duration_ms]

    # Step 3: Padding (if needed)
    current_len = len(final_segment_for_processing)
    if target_srt_duration_ms > current_len: # current_len can be 0 if TTS failed and target was also 0
        silence_needed_ms = target_srt_duration_ms - current_len
        if silence_needed_ms > 0:
            final_segment_for_processing += AudioSegment.silent(duration=silence_needed_ms)
    
    # Step 4: Volume Adjustment (if applicable and not handled by API)
    if tts_model == "gtts" or (tts_model == "openai" and volume_db != 0): # Google Cloud volume handled by API
        if volume_db != 0 and len(final_segment_for_processing) > 0:
            final_segment_for_processing = final_segment_for_processing + volume_db
            
    return final_segment_for_processing


# split_srt_into_chunks is unchanged from the previous version
def split_srt_into_chunks(parsed_srt, max_duration_seconds, start_index_filter=0):
    max_duration_ms = max_duration_seconds * 1000
    chunks = []
    current_chunk_items = []
    
    first_relevant_srt_item_start_ms = None
    if parsed_srt:
        for srt_item_index_loop, start_time_str_loop, _, _ in parsed_srt:
             if srt_item_index_loop >= start_index_filter:
                 first_relevant_srt_item_start_ms = srt_time_to_milliseconds(start_time_str_loop)
                 break
    
    if first_relevant_srt_item_start_ms is None and start_index_filter > 0 and parsed_srt:
        return []

    chunk_actual_start_time_ms = first_relevant_srt_item_start_ms if first_relevant_srt_item_start_ms is not None else 0

    for srt_item_index, start_time_str, end_time_str, text_content in parsed_srt:
        if srt_item_index < start_index_filter:
            continue

        start_ms = srt_time_to_milliseconds(start_time_str)
        end_ms = srt_time_to_milliseconds(end_time_str)
        current_item_srt_span_ms = end_ms - chunk_actual_start_time_ms

        if current_item_srt_span_ms < 0: 
            print(f"Skipping subtitle index {srt_item_index} due to inconsistent time.")
            continue
        
        if current_item_srt_span_ms > max_duration_ms and current_chunk_items:
            chunks.append(list(current_chunk_items)) 
            current_chunk_items = [] 
            chunk_actual_start_time_ms = start_ms 
            current_item_srt_span_ms = end_ms - chunk_actual_start_time_ms 

        current_chunk_items.append((srt_item_index, start_ms, end_ms, text_content))
        
        if (end_ms - start_ms) > max_duration_ms and len(current_chunk_items) == 1:
            chunks.append(list(current_chunk_items))
            current_chunk_items = []
            chunk_actual_start_time_ms = -1 

    if current_chunk_items:
        chunks.append(list(current_chunk_items))
    return chunks

# --- Main Processing Logic with Threading ---
def srt_to_audio(srt_path, output_path_prefix, input_lang_srt, output_lang_voice,
                 initial_speed_factor_config, volume_adjustment_db_config,
                 max_chunk_duration_sec, speed_limit_config,
                 processing_start_index, tts_model_config, google_voice_name_config):
    parsed_srt = parse_srt(srt_path)
    if not parsed_srt:
        print(f"No subtitles parsed from {srt_path}. Skipping.")
        return

    srt_chunks = split_srt_into_chunks(parsed_srt, max_chunk_duration_sec, processing_start_index)
    output_chunk_file_idx = 1

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for chunk_num, current_chunk_items in enumerate(srt_chunks):
            if not current_chunk_items: continue

            chunk_srt_start_ms = current_chunk_items[0][1]
            chunk_srt_end_ms = current_chunk_items[-1][2]
            final_audio_chunk_total_duration_ms = chunk_srt_end_ms - chunk_srt_start_ms
            
            if final_audio_chunk_total_duration_ms <= 0:
                print(f"Skipping chunk {chunk_num+1} (to {output_chunk_file_idx}) due to zero/negative SRT duration ({final_audio_chunk_total_duration_ms}ms).")
                continue

            final_audio_chunk = AudioSegment.silent(duration=final_audio_chunk_total_duration_ms)
            last_subtitle_index_in_chunk = current_chunk_items[-1][0]

            futures_map = {} # Maps future to {srt_idx, overlay_position_ms}
            print(f"\nSubmitting tasks for audio chunk {output_chunk_file_idx} (SRTs up to {last_subtitle_index_in_chunk}) for '{os.path.basename(srt_path)}'...")
            # print(f"  Audio chunk SRT time: {chunk_srt_start_ms/1000:.2f}s to {chunk_srt_end_ms/1000:.2f}s (Len: {final_audio_chunk_total_duration_ms/1000:.2f}s)")

            for item_idx_in_chunk, (srt_idx, start_ms, end_ms, text) in enumerate(current_chunk_items):
                if text.strip():
                    task_info_for_map = {
                        "srt_idx": srt_idx,
                        "overlay_position_ms": start_ms - chunk_srt_start_ms,
                        "text_preview": text[:20] # For easier debugging if needed
                    }
                    target_duration_this_subtitle = end_ms - start_ms

                    future = executor.submit(
                        process_subtitle_to_final_audio, # New worker function
                        text=text,
                        srt_idx=srt_idx,
                        tts_model=tts_model_config,
                        language=output_lang_voice,
                        initial_speed_factor=initial_speed_factor_config,
                        target_srt_duration_ms=target_duration_this_subtitle,
                        volume_db=volume_adjustment_db_config,
                        speed_limit=speed_limit_config,
                        temp_file_id_prefix=f"{os.path.splitext(os.path.basename(srt_path))[0]}_c{chunk_num}_s{srt_idx}",
                        google_voice_name=google_voice_name_config # <-- THÊM DÒNG NÀY
                    )
                    futures_map[future] = task_info_for_map
                else: # Handle empty text lines in SRT - create silence directly
                    target_dur_ms = end_ms - start_ms
                    if target_dur_ms > 0:
                        silent_segment = AudioSegment.silent(duration=target_dur_ms)
                        final_audio_chunk = final_audio_chunk.overlay(
                            silent_segment,
                            position=(start_ms - chunk_srt_start_ms)
                        )
            
            print(f"  All {len(futures_map)} subtitle tasks for chunk {output_chunk_file_idx} submitted. Waiting for completion...")
            for future in as_completed(futures_map):
                task_info = futures_map[future]
                srt_idx_completed = task_info["srt_idx"]
                try:
                    processed_segment_for_subtitle = future.result() # This is the fully processed audio
                    
                    if len(processed_segment_for_subtitle) > 0:
                        # print(f"    SRT Idx {srt_idx_completed} ('{task_info['text_preview']}...'): Overlaying {len(processed_segment_for_subtitle)}ms at {task_info['overlay_position_ms']}ms.")
                        final_audio_chunk = final_audio_chunk.overlay(
                            processed_segment_for_subtitle,
                            position=task_info["overlay_position_ms"]
                        )
                except Exception as e_task_complete:
                    print(f"  Error in completed task result for subtitle index {srt_idx_completed} ('{task_info['text_preview']}...'): {e_task_complete}")
                    print(traceback.format_exc())

            output_audio_path = f"{output_path_prefix}_chunk{output_chunk_file_idx}_endIdx{last_subtitle_index_in_chunk}.wav"
            try:
                if len(final_audio_chunk) > 0 :
                    final_audio_chunk.export(output_audio_path, format="wav")
                    print(f"SUCCESS: Audio chunk '{os.path.basename(output_audio_path)}' created (Duration: {len(final_audio_chunk)/1000:.2f}s).")
                else:
                    print(f"SKIPPED: Audio chunk '{os.path.basename(output_audio_path)}' would be empty.")
            except Exception as e_export:
                print(f"Error exporting audio chunk '{output_audio_path}': {e_export}")
            output_chunk_file_idx += 1

def process_srt_folder(root_input_folder, root_output_folder, lang_srt, lang_voice,
                       base_speed, vol_db, chunk_duration_s, max_spd_limit, start_idx_filter, tts_engine, google_voice_name):
    if not os.path.exists(root_input_folder):
        print(f"Error: Input folder '{root_input_folder}' does not exist.")
        return
    if not os.path.exists(root_output_folder):
        os.makedirs(root_output_folder)
        print(f"Created output folder: '{root_output_folder}'")

    for filename in os.listdir(root_input_folder):
        if filename.endswith('.srt'):
            srt_file_path = os.path.join(root_input_folder, filename)
            sanitized_filename_base = re.sub(r'[^\w\-_]', '', os.path.splitext(filename)[0])
            output_filename_prefix = os.path.join(root_output_folder, sanitized_filename_base)
            
            print(f"\n================ PROCESSING SRT: {filename} ================")
            srt_to_audio(
                srt_path=srt_file_path, output_path_prefix=output_filename_prefix,
                input_lang_srt=lang_srt, output_lang_voice=lang_voice,
                initial_speed_factor_config=base_speed, volume_adjustment_db_config=vol_db,
                max_chunk_duration_sec=chunk_duration_s, speed_limit_config=max_spd_limit,
                processing_start_index=start_idx_filter, tts_model_config=tts_engine,
                google_voice_name_config=google_voice_name # <-- THÊM DÒNG NÀY
            )

# --- Main Execution ---
if __name__ == "__main__":
    if not os.path.exists(input_folder):
        os.makedirs(input_folder)
    if not os.listdir(input_folder):
        dummy_srt_path = os.path.join(input_folder, "dummy_test.srt")
        with open(dummy_srt_path, "w", encoding="utf-8") as f:
            f.write("1\n00:00:01,000 --> 00:00:03,500\nĐây là một câu ví dụ đầu tiên.\n\n")
            f.write("2\n00:00:04,000 --> 00:00:06,000\nCâu này là câu thứ hai để kiểm tra.\n\n")
            f.write("3\n00:00:06,500 --> 00:00:10,000\nVà đây là một câu dài hơn một chút để thử nghiệm việc điều chỉnh tốc độ giọng nói cho phù hợp.\n\n")
            f.write("4\n00:00:10,500 --> 00:00:11,500\nNgắn.\n")
        print(f"Created dummy SRT: {dummy_srt_path}")

    print(f"--- SRT to Audio Conversion ---")
    print(f"Input: {input_folder}, Output: {output_folder}, TTS: {model}")
    print(f"Voice Lang: {output_voice}, Initial Speed: {speed}x, Max Speed: {max_speed_limit}x")
    print(f"Volume: {volume}dB, Max Chunk Dur: {max_duration_seconds}s, Start Idx: {start_index}")
    print(f"Max Threads: {MAX_WORKERS}")
    print("---------------------------------")

    process_srt_folder(
        root_input_folder=input_folder, root_output_folder=output_folder,
        lang_srt=input_language_srt, lang_voice=output_voice, base_speed=speed,
        vol_db=volume, chunk_duration_s=max_duration_seconds, max_spd_limit=max_speed_limit,
        start_idx_filter=start_index, tts_engine=model, google_voice_name=voice_google_cloud # <-- THÊM DÒNG NÀY
    )
    print("\nAll SRT files processed.")

--- SRT to Audio Conversion ---
Input: C:\Users\Hieu Pham\Downloads\1, Output: C:\Users\Hieu Pham\Downloads\1, TTS: gtts
Voice Lang: vi, Initial Speed: 1.5x, Max Speed: 2x
Volume: 15dB, Max Chunk Dur: 50000s, Start Idx: 0
Max Threads: 20
---------------------------------


Submitting tasks for audio chunk 1 (SRTs up to 151) for '[Vietnamese] A Simple Way to Break a Bad Habit _ Judson Brewer _ TED [DownSub.com].srt'...
  All 151 subtitle tasks for chunk 1 submitted. Waiting for completion...
SUCCESS: Audio chunk 'VietnameseASimpleWaytoBreakaBadHabit_JudsonBrewer_TEDDownSubcom_chunk1_endIdx151.wav' created (Duration: 561.04s).

All SRT files processed.
