<div class="alert alert-success" style = "border-radius:10px;border-width:3px;border-color:white;font-family:Verdana,sans-serif;font-size:16px;">
<h2>Audio files transcriber</h2>
This Jupyter notebook focuses on preparing and processing audio files for the whisper-large-v2 model. Initially, we modify the audio files to ensure they are in a format compatible with the model. Each audio file is then processed and divided into 30-second segments, which are saved in respective folders.

For each folder, we process the audio segments and compile the transcriptions into a single .txt file. The name of this output file corresponds to the 'ID' of the original audio file, making it easy to track and reference.

Lastly, the notebook includes a modified query to the model, enabling the transcription of the processed data into English. This step is crucial for ensuring that the transcriptions are available in a language that suits our analytical needs.

In [3]:
#!conda install -c anaconda ffmpeg -y
#!pip install pydub

In [30]:
import json
import boto3
import os
from sagemaker.jumpstart import utils
from sagemaker.serializers import JSONSerializer
from pydub import AudioSegment

Let's adapt the files to the specific format of the model

In [32]:
def convert_mp3_to_wav(input_mp3_file):
    # Extracting the file name without extension
    output_directory = os.path.splitext(input_mp3_file)[0] + "/"

    # Create the output directory if it doesn't exist
    os.makedirs("../segments_1/"+output_directory, exist_ok=True)

    try:
        # Load the downloaded MP3 file using pydub
        audio = AudioSegment.from_mp3("../all_samples/"+input_mp3_file)

        # Calculate the duration of the audio in milliseconds
        audio_duration_ms = len(audio)

        # Duration of each segment in milliseconds (30 seconds)
        segment_duration_ms = 30 * 1000

        # Iterate over the audio, creating 30-second segments
        for i in range(0, audio_duration_ms, segment_duration_ms):
            # Extract a 30-second segment
            segment = audio[i:i + segment_duration_ms]

            # Generate an output WAV file name for the segment
            segment_output_wav_file = "segments_1/"+f"{output_directory}segment_{i // 1000}.wav"

            # Convert the segment to WAV format with a sample rate of 16kHz
            segment = segment.set_frame_rate(16000)

            # Export the segment as a WAV file
            segment.export(segment_output_wav_file, format="wav")
            
    except Exception as e:
        print(f"Error processing {input_mp3_file}: {e}")

Connect to the model endpoint and directly run the query to obtain the transcribed output. The model automatically identifies the language of the input file and transcribes it in the same language. The output is a string containing the transcription

In [31]:
endpoint_name = 'jumpstart-dft-hf-asr-whisper-large-v2'

#Create a query to the model. Output is the transcipt text
def query_endpoint(body, content_type):
    client = boto3.client('runtime.sagemaker')
    response = client.invoke_endpoint(EndpointName=endpoint_name, ContentType=content_type, Body=body)
    model_predictions = json.loads(response['Body'].read())
    text = model_predictions['text']
    
    # Return the model predictions instead of printing them
    return text

In [15]:
# Directory containing the MP3 files
directory_path = '../all_samples'

# Iterate over all files in the specified directory
for filename in os.listdir(directory_path):
    # Check if the file is an MP3 file
    if filename.lower().endswith('.mp3'):
        # Call the convert_mp3_to_wav function for each MP3 file
        convert_mp3_to_wav(filename)

Process all the files in the directory. Each file contains multiple segments

In [52]:
def process_directory(directory_path):
    # Specify the file name in the output directory
    file_txt = os.path.join("../output", f"{os.path.basename(directory_path.rstrip('/'))}.txt")

    # Clear the contents of the text file if it exists
    if os.path.exists(file_txt):
        with open(file_txt, "w") as clear_file:
            clear_file.truncate(0)

    # List all files in the directory and sort them based on the numeric part of filenames
    file_list = os.listdir(directory_path)
    file_list.sort(key=lambda x: int(x.split("_")[1].split(".")[0]))

    # Loop through each file in the directory
    for file_name in file_list:
        # Construct the full path to the file
        file_path = os.path.join(directory_path, file_name)
        
        i=0
        # Check if the item in the directory is a file
        if os.path.isfile(file_path):
            # Open and read the binary data from the file
            with open(file_path, "rb") as file:
                wav_file_read = file.read()

            # Send the binary data to the query endpoint for each file
            predictions = query_endpoint(wav_file_read, "audio/wav")
            predictions = delete_repeated_words(predictions[i])
            i = i + 1

            # Convert the 'predictions' list to a string and write it to the file directly
            with open(file_txt, "a") as output_file:
                output_file.write(predictions)  # Add a newline character to separate the strings

    return predictions


Implement a filter to resolve the issue encountered in some transcriptions where words were illogically repeated. Any word that is repeated within a span of five distinct words will be eliminated.

In [51]:
def delete_repeated_words(texto):
    palabras = texto.split()
    texto_limpio = []

    for i in range(len(palabras)):
        if i > 4:
            palabras_anteriores = palabras[i-5:i]
        else:
            palabras_anteriores = palabras[0:i]

        if palabras[i] not in palabras_anteriores:
            texto_limpio.append(palabras[i])

    return ' '.join(texto_limpio)


Execute all the transcription into Spanish, from the modified audio files.

In [None]:
main_directory = "segments_1"

# List all subdirectories in the main directory
subdirectories = [d for d in os.listdir(main_directory) if os.path.isdir(os.path.join(main_directory, d))]

# Iterate through each subdirectory
for subdirectory in subdirectories:
    subdirectory_path = os.path.join(main_directory, subdirectory)
    prediction_vector = process_directory(subdirectory_path)

Modify the query to the model endpoint to transcribe the outputs into English.

In [None]:
def query_endpoint_with_translation(input_audio_file_name):

    # Read the audio file
    with open(input_audio_file_name, "rb") as file:
        wav_file_read = file.read()

    # Prepare the payload with audio input, language, and translation task
    payload = {
        "audio_input": wav_file_read.hex(),
        "language": "spanish", 
        "task": "translate"
    }

    # Create a SageMaker runtime client
    client = boto3.client('runtime.sagemaker')

    # Set the serializer for the predictor
    serializer = JSONSerializer()
    serializer.content_type = "application/json"

    # Invoke the endpoint
    response = client.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType="application/json",
        Body=serializer.serialize(payload)
    )

    # Parse the response and extract the translated text
    model_predictions = json.loads(response['Body'].read())
    translated_text = model_predictions['text']

    return translated_text


In [69]:
def process_directory_english(directory_path):
    # Specify the file name in the output directory
    file_txt = os.path.join("../output_eng", f"{os.path.basename(directory_path.rstrip('/'))}.txt")

    # Clear the contents of the text file if it exists
    if os.path.exists(file_txt):
        with open(file_txt, "w") as clear_file:
            clear_file.truncate(0)

    # List all files in the directory and sort them based on the numeric part of filenames
    file_list = os.listdir(directory_path)
    file_list.sort(key=lambda x: int(x.split("_")[1].split(".")[0]))

    # Loop through each file in the directory
    for file_name in file_list:
        # Construct the full path to the file
        file_path = os.path.join(directory_path, file_name)

        # Check if the item in the directory is a file (not a subdirectory)
        if os.path.isfile(file_path):
            # Open and read the binary data from the file
            predictions = query_endpoint_with_translation(file_path)

            # Convert the 'predictions' list to a string and write it to the file directly
            with open(file_txt, "a") as output_file:
                output_file.write("".join(predictions) + "")  # Add a newline character to separate the strings

    return predictions

In [71]:
main_directory = "segments_1"

# List all subdirectories in the main directory
subdirectories = [d for d in os.listdir(main_directory) if os.path.isdir(os.path.join(main_directory, d))]

# Iterate through each subdirectory
for subdirectory in subdirectories:
    subdirectory_path = os.path.join(main_directory, subdirectory)
    prediction_vector = process_directory_english(subdirectory_path)