### Media Content Localization Pipeline
This notebook will allow you to do the visual dubbing and lipsync pipeline.

Prerequisites:
- ffmpeg
- Completed 1 - Deploy SageMaker Endpoint.ipynb
- TTS Endpoints and Retalking Endpoints are in service

The following notebook has the following sections:

1. Parameters
2. Upload file to S3 and Transcribe source file
3. Translate using Amazon Translate
4. Create voice samples for voice cloning
5. Perform Text-to-speech
6. Perform Video Retalking


In [None]:
import boto3
import json 
import os
import time
import requests
import json
import re
from datetime import date
from datetime import datetime
from time import sleep
import tempfile

from IPython.display import clear_output

from pydub import AudioSegment

### Parameters

Set the following parameters

In [None]:
## The following are required parameters to set

# Region and S3 parameters
region_name = '<region>'

bucket = "<bucket name>"                                      # Specify the bucket to be used

# Source
source_file = "./samples/aws-fr.mp4"                     # Source video to localize
media_format = "mp4"                                          # Specify the media format for Amazon Transcribe

# Transcription
transcribe_source_language_code = "fr-CA"                     # Amazon Transcribe language code: en-US, es-US, ...

# Translation
translate_source_language_code = "fr-CA"                         # Amazon Translate language codes: en, es, ...
translate_target_language_code = "en"                         # Amazon Translate language codes: en, es, ...

# Reference voice samples creation
s3_reference_voice_folder = "aws-french"                     # The folder name to store the voice samples in s3 
voice_samples_dir = "./voice-samples/aws-french"             # Directory to store the reference voice clips after splitting

inference_id = "aws-french"                                  # Give it a unique inference id is the folder to store in inputs and outputs

# TTS Inference parameters 
endpoint_name = "tts-endpoint-async"                          # Specify the SageMaker async endpoint to use
retalking_endpoint_name = "retalking-endpoint-async"          # Specify the SageMaker async endpoint to use

## Optional to change 

# Bucket params
prefix_videos = "videos"                                      # Prefix to store the videos to be localized
prefix_inputs = "inputs"                                      # Prefix to store the inference inputs for async call
prefix_outputs = "outputs"                                    # Prefix to store the outputs for the inference
prefix_voice_samples = "voice-samples"                        # Prefix to store the voice samples

# Final output filenames
final_output_audio_filename = inference_id + ".wav"           # Final audio output
final_output_video_filename = f"{inference_id}-dubbed.mp4"    # Final video output with retalking



In [None]:
# Helper functions

def get_bucket(s3_uri):
    bucket = s3_uri.split("/")[2]
    return bucket
    
def get_key(s3_uri):
    key = "/".join(s3_uri.split("/")[3:])
    return key
    

### Upload file to S3 and Transcribe source file

The following uploads the video to Amazon S3 and uses Amazon Transcribe to retrieve the transcription of the video.

In [None]:
#Upload file
key_video = prefix_videos + "/" + source_file.split("/")[-1]

s3 = boto3.client('s3', region_name=region_name)
s3.upload_file(source_file, bucket, key_video)
print("Uploaded file to s3")

In [None]:
#Start transcribe job for given object
transcribe = boto3.client('transcribe', region_name=region_name)
job_uri = "s3://{}/{}".format(bucket, key_video)
job_timestamp = date.today().strftime("Y-%m-%d-%H-%M-%S")

job_name = key_video.split("/")[-1].split(".")[0] + job_timestamp + "-job"

result = transcribe.start_transcription_job(
    TranscriptionJobName=job_name + "1",
    Media={'MediaFileUri': job_uri},
    MediaFormat=media_format,
    LanguageCode=transcribe_source_language_code
)

In [None]:
# Poll the transcriptionjobstatus until completed
while True:
    response = transcribe.get_transcription_job(TranscriptionJobName=job_name + "1")
    job_status = response['TranscriptionJob']['TranscriptionJobStatus']
    print("Transcription job status is:", job_status)
    if job_status == "COMPLETED":
        # get the transcript from the transcribe job into a JSON
        transcript_uri = response['TranscriptionJob']['Transcript']['TranscriptFileUri']
        result = json.loads(requests.get(transcript_uri).content)
        transcript = result['results']['transcripts'][0]['transcript']
        print("Transcript:")
        print(transcript)
        break
    else:
        time.sleep(5)

### Translate using Amazon Translate
Using Amazon Translate, the transcript output of Amazon Transcribe is translated into the target language code.

In [None]:
# Split text by sentence to translate and recombine
transcript_segments = transcript.split('.')

# translate the transcript using Amazon Translate to spanish
translate = boto3.client('translate', region_name=region_name)

translated_segments = []
for segment in transcript_segments:
    if segment != '' and segment is not None:
        
        response = translate.translate_text(Text=segment + ".",
                                            SourceLanguageCode=translate_source_language_code,
                                            TargetLanguageCode=translate_target_language_code)
        translated_segments.append(response['TranslatedText'])
        #print(f"Original: {segment + '.'}")
        #print(f"Translation: {response['TranslatedText']}")
        
translated_text = ''.join(translated_segments)
translated_text

### Creating Reference Voice Clips
The the following section performs the following:
- Extracts the audio track from the source clip
- Using the results from Amazon Transcribe, splits on sentences using the periods "."
- Filters for voice samples that are greater than 2 seconds and less than 10 seconds needed for TortoiseTTS
- Uploads the voice samples


In [None]:
# Load the audio from the source file
source_audio = AudioSegment.from_file(source_file)

In [None]:
# Build splits
sentences = []

current_sentence = ""
sentence_start_time = None
sentence_end_time = None

for item in result['results']['items']:
    
    # Punctuations don't have start and end_times
    if item['type'] != 'punctuation':
        
        #Set the start_time if it's a new sentence
        if sentence_start_time is None:
            sentence_start_time = item['start_time']
        
        #Update the end time until the final word before a period    
        sentence_end_time = item['end_time']
        
    # Concatenate the current word to the current sentence
    
    if item['type'] != 'punctuation':
        current_sentence = current_sentence + ' ' + item['alternatives'][0]['content']
    
    if item['type'] == 'punctuation':
        current_sentence = current_sentence + item['alternatives'][0]['content'] + ' '
    
    if item['type'] == 'punctuation' and item['alternatives'][0]['content'] == '.':        
        sentences.append({
            "sentence": current_sentence.strip(),
            "sentence_start_time": float(sentence_start_time),
            "sentence_end_time": float(sentence_end_time),
            "sentence_duration": float(sentence_end_time) - float(sentence_start_time)
        })
        
        current_sentence = ""
        sentence_start_time = None
        sentence_end_time = None
        
# Select segments that are >2 and <= 10 seconds in length
selected_sentences = []
for sentence in sentences:
    if sentence['sentence_duration'] > 2 and sentence['sentence_duration'] <= 10:
        selected_sentences.append(sentence)
selected_sentences

In [None]:
# Create the voice_samples_dir if it doesn't exist
if not os.path.exists(voice_samples_dir):
    os.makedirs(voice_samples_dir)


# Split and save audio
i = 0
for sentence in selected_sentences:
    start_time_ms = sentence['sentence_start_time'] * 1000
    end_time_ms = sentence['sentence_end_time'] * 1000
    
    segment = source_audio[start_time_ms:end_time_ms]
    print("Exporting segment", i, "to", f"{voice_samples_dir}/{i}.wav")
    segment.export(f"{voice_samples_dir}/{i}.wav", format="wav")
    i+=1

In [None]:
# Upload voice samples to Amazon S3
for root, _, files in os.walk(voice_samples_dir):
    for file in files:
        full_path = os.path.join(root, file)
        key =  prefix_voice_samples + "/" + s3_reference_voice_folder + "/" + file
        print(f"Uploading {full_path} to s3://{bucket}/{key}")
        s3.upload_file(full_path, bucket,key)


### Perform text-to-speech using SageMaker async endpoint
The following section will do the following:
- Given the translated transcript, split the text into sentences
- Create multiple input requests JSON and uploads it to the input folder in the S3 bucket
- Invoke the SageMaker using async invocation for all the input requests json


__Taking advantage of parallel instances__ The sentences are split prior to doing multiple async invocations to parallelize the inference significantly reducing the time it takes to generate.


In [None]:
sagemaker = boto3.client('sagemaker-runtime', region_name=region_name)

#### Splitting Text

Spliting the text is needed as there's limitations to how long the generation can be with the given models.

In [None]:
# Split text on periods "."
def split_with_period(text):
  """Splits text on periods but keeps the period in the resulting list.

  Args:
      text: The text string to split.

  Returns:
      A list of substrings, including the periods.
  """
  return re.split(r"(?<=\.)\s", text)

translated_sentences = split_with_period(translated_text)
translated_sentences


In [None]:
# Prepare payloads
'''
The TTS SageMaker Endpoint accepts the following parameters:
    id (int) The payload ID used for resequencing the files after generation
    text (str) The text to be translated
    voice_sampples_s3_uri (str) The S3 URI for the voice samples folder
    input_s3_uri (str) The S3 URI for the payload
    destination_s3_uri (str) The S3 URI for where the generated audio is uploaded to
    model_id (str) Not currently used, Reserved for future use
    inference_params (dict) Not currently used, reserved for future use
'''

payloads = []
for translated_sentence,i in zip(translated_sentences, range(len(translated_sentences))):    
    payload = {"id": i,
               "text": translated_sentence, 
                "voice_samples_s3_uri": f"s3://{bucket}/{prefix_voice_samples}/{s3_reference_voice_folder}",
                "input_s3_uri": f"s3://{bucket}/{prefix_inputs}/{inference_id}/{inference_id}-part-{i}.json",
                "destination_s3_uri": f"s3://{bucket}/{prefix_outputs}/{inference_id}/{i}.wav", 
                "model_id": '',             # Not currently used, Reserved for future use
                "inference_params": {}}     # Not currently used, Reserved for future use
    payloads.append(payload)
payloads

In [None]:
# Upload payloads to Amazon S3

s3 = boto3.resource('s3', region_name=region_name)
sagemaker = boto3.client('sagemaker-runtime', region_name=region_name)

for payload in payloads:
    
    # Upload the request json
    print(f"Uploading {payload['input_s3_uri']}")
    key = "/".join(payload['input_s3_uri'].split("/")[3:])
    s3_object = s3.Object(bucket, key)
    s3_object.put(Body=json.dumps(payload).encode('utf-8'))

    # Invoke SageMaker async endpoint
    print(f"Invoking {endpoint_name} with {payload['input_s3_uri']}")
    response = sagemaker.invoke_endpoint_async(
        EndpointName=endpoint_name,
        ContentType='application/json',
        InputLocation=payload['input_s3_uri'],
        InvocationTimeoutSeconds=3600
    )


In [None]:
# Poll for completion
s3 = boto3.client('s3', region_name=region_name)

all_completed = False  # Flag to track completion

while not all_completed:
    print("=================")
    print(f"Checking progress - {datetime.now()}")
    completed_count = 0  # Count completed payloads

    for payload in payloads:
        key = "/".join(payload['destination_s3_uri'].split("/")[3:])
        print(f"{payload['id']}...", end="")
        try:
            s3.head_object(Bucket=bucket, Key=key)
            print(" Completed.")
            completed_count += 1
        except:
            print(" In Progress.")

    # Check if all payloads are completed
    all_completed = completed_count == len(payloads)

    if all_completed:
        print("All payloads completed!")
    else:
        sleep(10)
        clear_output(wait=True) 

In [None]:
s3 = boto3.client('s3', region_name=region_name)

# Create a temporary directory to download the parts to
with tempfile.TemporaryDirectory() as tmpdir:
    
    final_output_audio = AudioSegment.empty()
    
    for payload in payloads:
        print(f"Downloading {payload['destination_s3_uri']}")
        bucket = get_bucket(payload['destination_s3_uri'])
        key = get_key(payload['destination_s3_uri'])
        
        local_filepath = os.path.join(tmpdir, key.split("/")[-1])
        s3.download_file(bucket, key, local_filepath)
        
        
        # concatenate files
        final_output_audio += AudioSegment.from_wav(local_filepath)
        
    print(f"Creating final audio {final_output_audio_filename}")
    final_output_audio.export(final_output_audio_filename, format="wav")
    


In [None]:
# Take source video and source audio lengths, calculate tempo to adjust audio speed
import subprocess


final_output_audio_atempo_filename = f"{inference_id}-atempo.wav"

# Retrieve lengths
source_length = len(source_audio)
dubbed_audio = AudioSegment.from_file(final_output_audio_filename)
dubbed_length = len(dubbed_audio)

# Calculate atempo adjustment
atempo = dubbed_length/source_length

# Adjust final audio
subprocess.run([
    'ffmpeg', '-i', final_output_audio_filename, '-filter:a', f'atempo={atempo}', '-y', final_output_audio_atempo_filename
])

In [None]:
# Upload the final audio
key = f"{prefix_outputs}/{final_output_audio_atempo_filename}"
s3.upload_file(final_output_audio_atempo_filename, bucket, key)
print(f"Uploaded final tempo adjusted file - s3://{bucket}/{key}")

### Perform video retalking using SageMaker async endpoint 
```
The SageMaker Retalking Endpoint accepts the following parameters:
    input_s3_uri (str): The S3 URI of the payload file
    input_video_s3_uri (str): The S3 URI of the input video
    input_audio_s3_uri (str): The S3 URI of the input audio to lip sync with
    output_video_s3_uri (str): The S3 URI of where the new video will be outputted to
```

In [None]:
# Prepare payload
s3 = boto3.resource('s3', region_name=region_name)
sagemaker = boto3.client('sagemaker-runtime', region_name=region_name)

payload = {
        "input_s3_uri": f"s3://{bucket}/{prefix_inputs}/{inference_id}.json",
        "input_video_s3_uri": f"s3://{bucket}/{prefix_videos}/{os.path.basename(source_file)}",
        "input_audio_s3_uri": f"s3://{bucket}/{prefix_outputs}/{final_output_audio_atempo_filename}",
        "output_video_s3_uri": f"s3://{bucket}/{prefix_outputs}/{final_output_video_filename}",
        "inference_params": {},
    }

    
payload

In [None]:
# Upload the request json
print(f"Uploading {payload['input_s3_uri']}")
key = "/".join(payload['input_s3_uri'].split("/")[3:])
s3_object = s3.Object(bucket, key)
s3_object.put(Body=json.dumps(payload).encode('utf-8'))

# Invoke SageMaker async endpoint
print(f"Invoking {retalking_endpoint_name} with {payload['input_s3_uri']}")
response = sagemaker.invoke_endpoint_async(
    EndpointName=retalking_endpoint_name,
    ContentType='application/json',
    InputLocation=payload['input_s3_uri'],
    InvocationTimeoutSeconds=3600
)

In [None]:
response

In [None]:
# Poll for completion
s3 = boto3.client('s3', region_name=region_name)

all_completed = False  # Flag to track completion

while not all_completed:
    print("=================")
    print(f"Checking progress - {datetime.now()}")
    completed_count = 0  # Count completed payloads

    key = "/".join(payload['output_video_s3_uri'].split("/")[3:])
    try:
        s3.head_object(Bucket=bucket, Key=key)
        print(" Completed.")
        all_completed = True
    except:
        print(" In Progress.")

    if all_completed:
        print("Retalking completed!")
    else:
        sleep(10)
        clear_output(wait=True) 

In [None]:
# Download the completed file

s3.download_file(bucket, key, final_output_video_filename)
print(f"Downloaded to {final_output_video_filename}")
