In [13]:
from g2p import make_g2p

transducer = make_g2p('eng', 'eng-ipa')

def rate_apply(batch, rank=None, audio_column_name="audio", text_column_name="text"):
    if isinstance(batch[text_column_name], list):  
        speaking_rates = []
        phonemes_list = []
        if "speech_duration" in batch:
            for text, audio_duration in zip(batch[text_column_name], batch["speech_duration"]):
                phonemes = transducer(text).output_string
                audio_duration = audio_duration if audio_duration != 0 else 0.01
                speaking_rate = len(phonemes) / audio_duration
                speaking_rates.append(speaking_rate)
                phonemes_list.append(phonemes)
        else:
            for text, audio in zip(batch[text_column_name], batch[audio_column_name]):
                phonemes = transducer(text).output_string
                
                sample_rate = audio["sampling_rate"]
                audio_length = len(audio["array"].squeeze()) / sample_rate
                
                speaking_rate = len(phonemes) / audio_length

                
                speaking_rates.append(speaking_rate)
                phonemes_list.append(phonemes)
        
        batch["speaking_rate"] = speaking_rates
        batch["phonemes"] = phonemes_list
    else:
        phonemes = transducer(batch[text_column_name]).output_string
        if "speech_duration" in batch:
            audio_length = batch["speech_duration"] if batch["speech_duration"] != 0 else 0.01
        else:
            sample_rate = batch[audio_column_name]["sampling_rate"]
            audio_length = len(batch[audio_column_name]["array"].squeeze()) / sample_rate

        speaking_rate = len(phonemes) / audio_length
        
        batch["speaking_rate"] = speaking_rate
        batch["phonemes"] = phonemes

    return batch

In [24]:
from g2p import make_g2p
import soundfile as sf
import numpy as np
# or alternatively:
# import torchaudio

transducer = make_g2p('eng', 'eng-ipa')

def rate_apply(batch, rank=None, audio_column_name="audio", text_column_name="text"):
    if isinstance(batch[text_column_name], list):  
        speaking_rates = []
        phonemes_list = []
        if "speech_duration" in batch:
            for text, audio_duration in zip(batch[text_column_name], batch["speech_duration"]):
                phonemes = transducer(text).output_string
                audio_duration = audio_duration if audio_duration != 0 else 0.01
                speaking_rate = len(phonemes) / audio_duration
                speaking_rates.append(speaking_rate)
                phonemes_list.append(phonemes)
        else:
            for text, audio in zip(batch[text_column_name], batch[audio_column_name]):
                phonemes = transducer(text).output_string
                
                sample_rate = audio["sampling_rate"]
                audio_length = len(audio["array"].squeeze()) / sample_rate
                
                speaking_rate = len(phonemes) / audio_length
                
                speaking_rates.append(speaking_rate)
                phonemes_list.append(phonemes)
        
        batch["speaking_rate"] = speaking_rates
        batch["phonemes"] = phonemes_list
    else:
        phonemes = transducer(batch[text_column_name]).output_string
        if "speech_duration" in batch:
            audio_length = batch["speech_duration"] if batch["speech_duration"] != 0 else 0.01
        else:
            sample_rate = batch[audio_column_name]["sampling_rate"]
            audio_length = len(batch[audio_column_name]["array"].squeeze()) / sample_rate

        speaking_rate = len(phonemes) / audio_length
        
        batch["speaking_rate"] = speaking_rate
        batch["phonemes"] = phonemes

    return batch

# Loading the audio properly using soundfile
audio_data, sample_rate = sf.read("audio.wav")
text = "Hallo, wie geht es dir?"

# Create a dictionary with the properly formatted audio and text
data = {
    "audio": {
        "array": audio_data,
        "sampling_rate": sample_rate
    },
    "text": text
}

# Apply the function
data = rate_apply(data)
print(data)


{'audio': {'array': array([-0.027771  , -0.01174927,  0.0140686 , ..., -0.150177  ,
       -0.17578125, -0.13760376]), 'sampling_rate': 16000}, 'text': 'Hallo, wie geht es dir?', 'speaking_rate': 0.4, 'phonemes': ',   ɛs ?'}


In [None]:
'phonemes': 'halo, viː ɡeːt eːs diːɐ̯?'}

In [15]:
# Using torchaudio instead
import torchaudio

waveform, sample_rate = torchaudio.load("audio.wav")
data = {
    "audio": {
        "array": waveform.numpy(),
        "sampling_rate": sample_rate
    },
    "text": text
}


In [18]:
from dp.phonemizer import Phonemizer
import soundfile as sf

# Initialize the DeepPhonemizer with the Latin IPA model that supports German
phonemizer = Phonemizer.from_checkpoint('latin_ipa_forward.pt')

def rate_apply(batch, rank=None, audio_column_name="audio", text_column_name="text"):
    if isinstance(batch[text_column_name], list):  
        speaking_rates = []
        phonemes_list = []
        if "speech_duration" in batch:
            for text, audio_duration in zip(batch[text_column_name], batch["speech_duration"]):
                # Use DeepPhonemizer for German
                phonemes = phonemizer(text, lang='de')
                audio_duration = audio_duration if audio_duration != 0 else 0.01
                speaking_rate = len(phonemes) / audio_duration
                speaking_rates.append(speaking_rate)
                phonemes_list.append(phonemes)
        else:
            for text, audio in zip(batch[text_column_name], batch[audio_column_name]):
                # Use DeepPhonemizer for German
                phonemes = phonemizer(text, lang='de')
                
                sample_rate = audio["sampling_rate"]
                audio_length = len(audio["array"].squeeze()) / sample_rate
                
                speaking_rate = len(phonemes) / audio_length
                
                speaking_rates.append(speaking_rate)
                phonemes_list.append(phonemes)
        
        batch["speaking_rate"] = speaking_rates
        batch["phonemes"] = phonemes_list
    else:
        # Use DeepPhonemizer for German
        phonemes = phonemizer(batch[text_column_name], lang='de')
        if "speech_duration" in batch:
            audio_length = batch["speech_duration"] if batch["speech_duration"] != 0 else 0.01
        else:
            sample_rate = batch[audio_column_name]["sampling_rate"]
            audio_length = len(batch[audio_column_name]["array"].squeeze()) / sample_rate

        speaking_rate = len(phonemes) / audio_length
        
        batch["speaking_rate"] = speaking_rate
        batch["phonemes"] = phonemes

    return batch

# Loading the audio properly using soundfile
audio_data, sample_rate = sf.read("audio.wav")
text = "Hallo, wie geht es dir?"  # German text example

# Create a dictionary with the properly formatted audio and text
data = {
    "audio": {
        "array": audio_data,
        "sampling_rate": sample_rate
    },
    "text": text
}

# Apply the function
data = rate_apply(data)
print(data)


{'audio': {'array': array([-0.027771  , -0.01174927,  0.0140686 , ..., -0.150177  ,
       -0.17578125, -0.13760376]), 'sampling_rate': 16000}, 'text': 'Hallo, wie geht es dir?', 'speaking_rate': 1.25, 'phonemes': 'halo, viː ɡeːt eːs diːɐ̯?'}




In [None]:
from dp.phonemizer import Phonemizer

phonemizer = Phonemizer.from_checkpoint('latin_ipa_forward.pt')

text = "Guten Tag, wie geht es dir?"

phonemes = phonemizer(text, lang='de')

print(f"Original text: {text}")
print(f"Phonemes: {phonemes}")
