In [1]:
import importlib
import subprocess
import os
import sys
from pathlib import Path
from pydub import AudioSegment #type: ignore
from enum import Enum

#Custom Imports
import utils.mongodb_handler
from utils.mongodb_handler import MongoDBHandler
import utils.setup_helper
from utils.setup_helper import SetupHelper
import utils.piper_dialog_handler
from utils.piper_dialog_handler import PiperDialogHandler
import utils.logger_handler
from utils.logger_handler import Logger

#Constants
PAUSE_BETWEEN_SPOKEN_DIALOGUE = 1000 #Pause in milliseconds
CONST_TESTSIZE_INTSTATE = 5 #how many rows to synthesize in dev mode

#Load Config
piper_config = SetupHelper("piper", os.getcwd())
config_data = piper_config.getConfigValues()

class State(Enum):
      DEV = "dev",
      PROD = "Prod",
      INT = "Int"
      
class RunningDB(Enum):
      FULL = "full",
      SINGLE = "single"

#Load additional Util Classes
mongodb_handler = MongoDBHandler(config_data, "piper")
piper_dialog_handler = PiperDialogHandler()

# Setup Logger
logger = Logger()
original_stdout = sys.stdout  # Save the original stdout
sys.stdout = logger  # Redirect stdout to logger

Now we need to define the two main actors in this script, the Transcribing and the merging after completion.

In [None]:
def synthesizeText (textToSynthesize, exeFile, modelFile, outputFile):
      """Starts the synthetisation of a text with PiperTTS using Windows.

      Args:
          textToSynthesize (Any): Text to be synthesized
          exeFile (Any): Path to PiperTTS .exe-File
          modelFile (Any): Path to Model to be used in the Syntzesizing
          outputFile (Any): Path to directory, where synthesized Audio is to be stored.
      """
      # Construct the PowerShell command and encode it into UTF-8 for the special characters
      powershell_command = f"""
      $OutputEncoding = [System.Text.Encoding]::UTF8
      echo "{textToSynthesize}" | & "{exeFile}" -m "{modelFile}" -f "{outputFile}"
      """
      # Execute the command
      result = subprocess.run(["powershell", "-Command", powershell_command], capture_output=True, text=True, encoding="utf-8")
      # Output the result
      print(f"Ausgabeort: {result.stdout}")

      # Print any output messages if they exist (debug information included)
      if result.stderr:
            print(f"{result.stderr}")
    
def mergeAllWavFiles (full_dir, output_dir, id):
      """Merging all Audio files in a directory and saving it to the designatet output directory

      Args:
          full_dir (Any): Path to directory, where audio files need to be merged
          output_dir (Any): Path to directory, where merged audio files are saved.
          id (Any): Suffix to add to filename to identify later.
      """
      silenceDur = PAUSE_BETWEEN_SPOKEN_DIALOGUE
      silence = AudioSegment.silent(duration=silenceDur)
      mergedFile = os.path.join(output_dir, f'{id}_full.wav')
      fileInFullFolder = os.path.join(full_dir, f'{id}_full.wav')
      # Create an empty AudioSegment
      combined = AudioSegment.empty()

      # Iterate over each file in the folder
      for filename in os.listdir(output_dir):
            if filename.endswith('.wav'):
                  filePath = os.path.join(output_dir, filename)
                  audioSegment = AudioSegment.from_wav(filePath)
                  combined += audioSegment + silence # Concatenate the audio segments
      # Export the combined audio as a new WAV file
      combined.export(mergedFile, format='wav')
      print(f'Merged WAV file created: {mergedFile}')
      combined.export(fileInFullFolder, format='wav')
      print(f'Merged WAV file created: {fileInFullFolder}') 
      
            
def startTTSObj(data, run_state):
      print(f"Working through MongoDB Object with ID: {data[config_data['collection_id']]}")
      print("---------------------------------------------------------------") 
      currentFullDialog, numOfSpeaker, speakers = piper_dialog_handler.initDialogue(data[config_data['collection_id']], data[config_data['collection_text']])
      
      #iterate through conversation
      for index, rowDialog in enumerate(currentFullDialog):
            try:
                  print(f"Setting up Model File with ID: {rowDialog.voice}")
                  model_file = os.path.join(config_data["piper_dir"], piper_dialog_handler.voiceModelSelector(rowDialog.voice))
                  print(f"Setting up Output-Folderpath with ID: {rowDialog.id}")
                  output_dir = os.path.join(config_data["audio_dir"], f"{rowDialog.id}")
                  cur_id = rowDialog.id
                  if not Path(output_dir).exists():
                        print(f"Folder not found. Creating Folder {rowDialog.id}")
                        Path(output_dir).mkdir(parents=True, exist_ok=True)
                  #Max number of elements to split into = 10000
                  if rowDialog.element_id < 10:
                        output_file = os.path.join(output_dir, f"{rowDialog.id}_000{rowDialog.element_id}.wav")
                  elif rowDialog.element_id < 100:
                        output_file = os.path.join(output_dir, f"{rowDialog.id}_00{rowDialog.element_id}.wav")
                  elif rowDialog.element_id < 1000:
                        output_file = os.path.join(output_dir, f"{rowDialog.id}_0{rowDialog.element_id}.wav")
                  else:
                        output_file = os.path.join(output_dir, f"{rowDialog.id}_{rowDialog.element_id}.wav")
            except Exception as e:
                  print(f"Error while loading: {e} in element {rowDialog.element_id}")
                  print(f"Object Details:\n{rowDialog}") 
            match run_state:
                  case State.PROD: 
                        synthesizeText(rowDialog.text, config_data["piper_exe"], model_file, output_file)
                  case State.INT: 
                        if (index < CONST_TESTSIZE_INTSTATE): synthesizeText(rowDialog.text, config_data["piper_exe"], model_file, output_file)
                  case State.DEV:
                        print(f"Dev State active, not synthesizing any text.")
                  case default:
                        print("run_state is not a valid Running state")
      if run_state == State.PROD or run_state == State.INT:
            mergeAllWavFiles(config_data['fullFile_dir'], output_dir, cur_id)
      print(f"Rows processed in this conversation: {index}")      


To end this script, we need to close all open connections and running tasks.

In [None]:
current_state = State.INT
current_run = RunningDB.SINGLE

#Prepare Working Environment, pre Check, if "Audio" folder exists
if not Path(config_data["audio_dir"]).exists():
      print(f"Audio-Folder not found. Creating Folder 'Audio' at {config_data["audio_dir"]} and 'full_conversations' in it.")
      Path(config_data["audio_dir"]).mkdir(parents=True, exist_ok=True)
      Path(config_data['fullFile_dir']).mkdir(parents=True, exist_ok=True)

match current_run:
      case RunningDB.FULL: all_data = mongodb_handler.getAllItems()
      case RunningDB.SINGLE: all_data = mongodb_handler.getSingleItemByID({f"{config_data['collection_id']}":"30394981"})

if current_run == RunningDB.SINGLE:
      startTTSObj(all_data, current_state)
else:
      for data in all_data:
            startTTSObj(data, current_state)


Change the stdout of sys back to the normal value before closing the logger. Also disconnect the MongoDB Handler.

In [5]:
mongodb_handler.disconnectMongoDB()
# Reset stdout to default and close logger
sys.stdout = original_stdout
logger.close()