# AI Video Summarization

Publishers and broadcasters can leverage short-form video across social media platforms such as Facebook, Instagram, and TikTok to attract new audiences and create additional revenue opportunities.

However, generating video summaries is a manual and time-consuming process due to challenges like understanding complex content, maintaining coherence, diverse video types, and lack of scalability when dealing with a large volume of videos. Introducing automation through the use of artificial intelligence (AI) and machine learning (ML) can make this process more viable and scalable with automatic content analysis, real-time processing, contextual adaptation, customization, and continuous AI/ML system improvement.

### High level workflow

![video summarization diagram](static/images/video-summarization-diagram.jpg)

In this notebook, we'll break down each step and show you in detail how video summarization can be achieved using AWS native services such as [Amazon Transcribe](https://aws.amazon.com/pm/transcribe), [Amazon Bedrock](https://aws.amazon.com/bedrock), [Amazon Polly](https://aws.amazon.com/polly/) and [AWS Elemental MediaConvert](https://aws.amazon.com/mediaconvert/).

## Prerequisites

To run this notebook, you need to have run the previous notebook: [01-video-time-segmentation](01-video-time-segmentation.ipynb), where you segmented the video using audio, visual and semantic information.

### Import python packages

In [None]:
from pathlib import Path
import os
import json
import time
import boto3
import json_repair
from termcolor import colored
from IPython.display import JSON
from IPython.display import Video
from IPython.display import Pretty
from IPython.display import Image as DisplayImage
from lib.frames import VideoFrames
from lib.shots import Shots
from lib.scenes import Scenes
from lib.transcript import Transcript

### Retrieve saved values from previous notebooks
To run this notebook, you need to have run the previous notebook: 00_prerequisites.ipynb, where you installed package dependencies and gathered some information from the SageMaker environment.




In [None]:
store -r

## Summarize video content from transcript

We use **Large Language Model (LLM)** with [Amazon Bedrock](https://aws.amazon.com/bedrock/) to summarize the content of the video.

In [None]:
bedrock_client = boto3.client(service_name="bedrock-runtime")
accept = "application/json"
content_type = "application/json"

In [None]:
with open(video['transcript'].transcript_file, 'r') as file:
    transcript_file = json.load(file)
transcript = transcript_file['results']['transcripts'][0]['transcript']

model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

prompt = f"""Summarize the key points from the following video content in chronological order:

{transcript} 

\n\nThe summary should only contain information present in the video content. Do not include any new or unrelated information.

Important: Start the summary immediately without any introductory phrases. Begin directly with the first key point."""

body = json.dumps(
    {
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                ],
            }
        ],
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "temperature": 0.25,
        "top_p": 0.9,

    }
)
response = bedrock_client.invoke_model(
    body=body, modelId=model_id, accept=accept, contentType=content_type
)
response_body = json.loads(response["body"].read())
summarized_text = response_body["content"][0]["text"]

You can invoke the endpoint with different parameters defined in the payload to impact the text summarization. Two important parameters are `top_p` and `temperature`. While `top_p` is used to control the range of tokens considered by the model based on their cumulative probability, `temperature` controls the level of randomness in the output.

Although there isn’t a one-size-fits-all combination of `top_p` and `temperature` for all use cases, in the previous example, we demonstrate sample values with high `top_p` and low `temperature` that leads to summaries focused on key information and avoid deviating from the original text but still introduce some creative variations to keep the output interesting.

Let's check the summarized video content:

In [None]:
summarized_text

## Generate metadata for voice narration

The next step starts with [Amazon Polly](https://aws.amazon.com/polly/) to generate speech from the summarized text. The output of the Polly task is both MP3 files and documents marked up with [Speech Synthesis Markup Language (SSML)](https://docs.aws.amazon.com/polly/latest/dg/ssml.html). Within this SSML file, essential metadata is encapsulated, describing the duration of individual sentences vocalized by a specific Polly voice. With this audio duration information, we will be able to define the length of the video segments; in this case, a direct 1:1 correspondence is employed.

In [None]:
polly_client = boto3.client("polly")
voice_id = "Matthew"

In [None]:
response = polly_client.synthesize_speech(
    Engine="neural",
    OutputFormat="json",
    Text=summarized_text + " This video is generated by Video Summarization Hub.",
    TextType="text",
    SpeechMarkTypes=["sentence"],
    VoiceId=voice_id,
)

stream_data = response['AudioStream'].read()
polly_ssml = stream_data.decode('utf-8')

The following is the Amazon Polly synthesis task output in SSML format:

In [None]:
polly_ssml.split("\n")

## Select most relevant video shots/scenes

We need to select the most relevant video frame sequence to match with every sentence in the summarized content. Thus, we use text embedding to perform the sentence similarity task, which determines how similar two texts are.

Sentence similarity models transform input texts into vectors (embeddings) that capture semantic information and calculate the proximity or similarity between them.

In this step, we use **Text Embedding Model** with [Amazon Bedrock](https://aws.amazon.com/bedrock/) to create the embeddings for every sentence in the original subtitle and in the video summary.

First, we get the original subtitle file and do some processings to break it down into sentences with start times and end times.

In [None]:
with open(video['transcript'].vtt_file, 'r', encoding='utf-8') as file:
    subtitle = file.read()

if subtitle.startswith("WEBVTT"):
    subtitle = subtitle[len("WEBVTT"):].lstrip()

print(subtitle)

In [None]:
import re
def srt_to_arrays(s):
    """
    Converts the given transcription in SRT format to list of sentences and the corresponding timecodes.
    Args:
       s - transcription in SRT format.
    Returns:
       a tuple of collection with sentences and the correspoding start time and end time.
    """
    sentences = [line.strip() for line in re.findall(r"\d+\n.*?\n(.*?)\n", s)]

    def get_time(s):
        return re.findall(r"\d{2}:\d{2}:\d{2}.\d{3}", s)

    startTimes = get_time(s)[::2]
    endTimes = get_time(s)[1::2]
    startTimes_ms = [time_to_ms(time) for time in startTimes]
    endTimes_ms = [time_to_ms(time) for time in endTimes]

    filtered_sentences = []
    filtered_startTimes_ms = []
    filtered_endTimes_ms = []

    startTime_ms = -1
    endTime_ms = -1
    sentence = ""
    for i in range(len(sentences)):
        if startTime_ms == -1:
            startTime_ms = startTimes_ms[i]
        sentence += " " + sentences[i]
        if (
            sentences[i].endswith(".")
            or sentences[i].endswith("?")
            or sentences[i].endswith("!")
            or i == len(sentences) - 1
        ):
            endTime_ms = endTimes_ms[i]
            filtered_sentences.append(sentence)
            filtered_startTimes_ms.append(startTime_ms)
            filtered_endTimes_ms.append(endTime_ms)
            startTime_ms = -1
            endTime_ms = -1
            sentence = ""

    return filtered_sentences, filtered_startTimes_ms, filtered_endTimes_ms

def time_to_ms(time_str):
    match = re.match(r"(\d+):(\d+):(\d+)[.,](\d+)", time_str)
    h, m, s, ms = match.groups()
    return int(h) * 3600000 + int(m) * 60000 + int(s) * 1000 + int(ms)

In [None]:
original_sentences, startTimes, endTimes = srt_to_arrays(subtitle)

Let's visualize some the sentences:

In [None]:
original_sentences[:10]

Next, we create the text embeddings for every sentence in the original subtitle and in the video summary. The following code gives an example of how text embedding using Amazon Bedrock API works

In [None]:
import numpy as np

def text_embedding(original_sentences, polly_ssml):
    """
    Calculates the similarity between the given original sentences and the summarized sentences in SSML format.
    Args:
       original_sentences - sentences extacted from the original video
       polly_ssml - summarized sentences in SSML format.
    Return:
       collection of summarized_sentences, corresponding durations in millisecionds, best_matching_indices, similarity_matrix
       in a tuple.
    
    """
    summarized_sentences = []
    durations = []
    polly_ssml = polly_ssml.split("\n")
    for i in range(len(polly_ssml) - 1):
        curr = polly_ssml[i]
        next = polly_ssml[i + 1]
        if curr.strip() == "" or next.strip() == "":
            continue
        curr = json.loads(curr)
        next = json.loads(next)
        summarized_sentences.append(curr["value"])
        durations.append(int(next["time"]) - int(curr["time"]))

    model_id = "amazon.titan-embed-image-v1"
    accept = "application/json"
    content_type = "application/json"
    original_embeddings = []
    for str in original_sentences:
        body = json.dumps({"inputText": str})
        response = bedrock_client.invoke_model(
            body=body, modelId=model_id, accept=accept, contentType=content_type
        )
        response_body = json.loads(response["body"].read())
        original_embeddings.append(response_body.get("embedding"))
    original_embeddings = np.array(original_embeddings)

    summarized_embeddings = []
    for str in summarized_sentences:
        body = json.dumps({"inputText": str})
        response = bedrock_client.invoke_model(
            body=body, modelId=model_id, accept=accept, contentType=content_type
        )
        response_body = json.loads(response["body"].read())
        summarized_embeddings.append(response_body.get("embedding"))
    summarized_embeddings = np.array(summarized_embeddings)

    similarity_matrix = np_cosine_similarity(summarized_embeddings, original_embeddings)
    best_matching_indices = []
    len_summarized_sentences = len(summarized_sentences)
    len_original_sentences = len(original_sentences)

    # Find the best matching sentences.
    dp = np.zeros([len_summarized_sentences, len_original_sentences], dtype=float)
    for i in range(0, len_summarized_sentences):
        for j in range(0, len_original_sentences):
            if i == 0:
                dp[i][j] = similarity_matrix[i][j]
            else:
                max_score = -1
                for k in range(0, j):
                    if similarity_matrix[i][j] > 0 and dp[i - 1][k] > 0:
                        max_score = max(
                            max_score, similarity_matrix[i][j] + dp[i - 1][k]
                        )
                dp[i][j] = max_score

    j = len_original_sentences

    for i in range(min(len_original_sentences, len_summarized_sentences) - 1, -1, -1):
        arr = dp[i][:j]
        idx = np.argmax(arr)
        best_matching_indices.append(idx)
        j = idx
    best_matching_indices.reverse()

    return summarized_sentences, durations, best_matching_indices, similarity_matrix

We use `Cosine similarity` to measure similarities between two vectors.

In [None]:
def np_cosine_similarity(summarized_embeddings, original_embeddings):
    dot_products = np.dot(summarized_embeddings, original_embeddings.T)
    summarized_norms = np.linalg.norm(summarized_embeddings, axis=1)
    original_norms = np.linalg.norm(original_embeddings, axis=1)

    similarity_matrix = (
        dot_products / summarized_norms[:, None] / original_norms[None, :]
    )

    return similarity_matrix

In [None]:
summarized_sentences, durations, best_matching_indices, similarity_matrix = text_embedding(original_sentences, polly_ssml)

This will return the similarity matrix result as follow:

You can interpret the prior result as: the first row of the matrix corresponds to the first sentence in the summarized content and all the columns show its similarity scores to the sentences in the original text. Similarity values typically range between -1 and 1, where 1 indicates that the vectors are identical or very similar; 0 indicates that the vectors are orthogonal (not correlated) and have no similarity; -1 indicates that the vectors are diametrically opposed or very dissimilar.

From the similarity matrix, we identify the top-k highest similarity scores for each sentence in the summarized content, thereby aligning them with the most similar sentences in the original text. Each sentence in the original text also has its corresponding timestamp (i.e. startTime, endTime) stored in the original SRT subtitle file.

In [None]:
def get_timecodes(best_matching_indices, idx, endTimes, duration, timecodes):
    """
    Calculate the best start and end time for each summarized sentence aligned with the timecode from the original sentences
    Args:
      best_matching_indices - the indices from the original sentence that is most similar with the summarized sentences.
      idx - index from the summarized sentences to process
      endTimes - the endtime from the original sentences
      duration - speech duration for the synthesized sentences from the summarized text
      timecodes - timecode used for calculating the best placement for the summarized text within the video.
      
    """
    best_matching_idx = best_matching_indices[idx]
    startTime = int(endTimes[best_matching_idx]) - duration
    carry = max(0, timecodes[len(timecodes) - 1][1] - startTime)
    startTime += carry
    endTime = int(endTimes[best_matching_idx]) + carry
    return startTime, endTime

def milliseconds_to_time(ms, frame_rate=24):
    return "{:02d}:{:02d}:{:02d}:{:02d}".format(
        int((ms // 3600000) % 24),  # hours
        int((ms // 60000) % 60),  # minutes
        int((ms // 1000) % 60),  # seconds
        # fractions of a second (assuming 24 fps)
        int((ms % 1000) * frame_rate / 1000),
    )

In [None]:
intro_time = float(transcript_file["results"]["items"][0]["start_time"]) * 1000  # ms

timecodes = [[0, intro_time]]
for i in range(min(len(original_sentences), len(summarized_sentences))):
    startTime, endTime = get_timecodes(
        best_matching_indices,
        i,
        endTimes,
        durations[i],
        timecodes,
    )
    timecodes.append([startTime, endTime])
creditTime = endTime + 3500
timecodes.append([endTime, creditTime])
timecodes_text = ""
for timecode in timecodes:
    timecodes_text += (
        milliseconds_to_time(timecode[0])
        + ","
        + milliseconds_to_time(timecode[1])
        + "\n"
    )


An example of the timestamp output in text format is:

In [None]:
print(timecodes_text)

In order to make it work with the next video transcoding process with AWS Elemental Convert, we re-format the timecodes as follow:

In [None]:
to_json = lambda s: [
    {"StartTimecode": t1, "EndTimecode": t2}
    for t1, t2 in (line.split(",") for line in s.split("\n") if line.strip())
]
timecodes = to_json(timecodes_text)
print(timecodes)

By incorporating both the duration of Polly audio for each summarized sentence and the timestamps from the original subtitle file, we can then select the timestamp sequence for the most relevant frames corresponding to each summarized sentence. The length of each selected video segment for a summarized sentence will be aligned with the length of its narration audio.

In [None]:
escaped_summarized_text = (
        summarized_text.replace("&", "&amp;")
        .replace('"', "&quot;")
        .replace("'", "&apos;")
        .replace("<", "&lt;")
        .replace(">", "&gt;")
    )
ssml = "<speak>\n"
break_time = intro_time

while break_time > 10000:  # maximum break time in Polly is 10s
    ssml += '<break time = "' + str(break_time) + 'ms"/>'
    break_time -= 10000
ssml += '<break time = "' + str(break_time) + 'ms"/>'
ssml += escaped_summarized_text
ssml += "</speak>"

response = polly_client.synthesize_speech(
    Engine="neural",
    OutputFormat="mp3",
    Text=ssml,
    TextType="ssml",
    VoiceId=voice_id,
)

if "AudioStream" in response:
    with response["AudioStream"] as stream:
        audio_narration = stream.read()

In [None]:
print(ssml)

Again, we upload the audio narration into Amazon S3 bucket ready for the videotranscoding step with AWS Elemental Convert.

In [None]:
s3_client = boto3.client("s3")
s3_bucket = session["bucket"]
audio_narration_filename = os.path.splitext(os.path.basename(video['mp4_file']))[0] + ".mp3"
s3_client.put_object(
    Body=audio_narration, Bucket=s3_bucket, Key=audio_narration_filename, ContentType="audio/mpeg"
)

## Create MediaConvert assembly workflows

We use the sequence of the timestamps as parameters to create AWS Elemental MediaConvert assembly workflows to performs basic input clipping.

By combining it with the MP3 audio from Amazon Polly and along with the possibility of incorporating background music of your preference, you can ultimately achieve the final video summarization output.

Let's start the assembly workflow from the original video input. An assembly workflow is a MediaConvert job that performs basic input clipping and stitching to assemble output assets from one or different sources without requiring separate editing software.

In [None]:
iam_role = session["MediaConvertRole"]
input_video_path = video["url"]
output_video_path = f"s3://{s3_bucket}/"

In [None]:
media_convert = boto3.client("mediaconvert")
response = media_convert.create_job(
    Queue="Default",
    UserMetadata={},
    Role=iam_role,
    Settings={
        "TimecodeConfig": {"Source": "ZEROBASED"},
        "OutputGroups": [
            {
                "Name": "File Group",
                "Outputs": [
                    {
                        "ContainerSettings": {
                            "Container": "MP4",
                            "Mp4Settings": {},
                        },
                        "VideoDescription": {
                            "CodecSettings": {
                                "Codec": "H_264",
                                "H264Settings": {
                                    "MaxBitrate": 40000000,
                                    "RateControlMode": "QVBR",
                                    "SceneChangeDetect": "TRANSITION_DETECTION",
                                },
                            }
                        }
                    }
                ],
                "OutputGroupSettings": {
                    "Type": "FILE_GROUP_SETTINGS",
                    "FileGroupSettings": {"Destination": output_video_path},
                },
            }
        ],
        "Inputs": [
            {
                "VideoSelector": {},
                "TimecodeSource": "ZEROBASED",
                "FileInput": input_video_path,
                "InputClippings": timecodes,
            }
        ],
    },
    AccelerationSettings={"Mode": "DISABLED"},
    StatusUpdateInterval="SECONDS_60",
    Priority=0,
)

job_complete = False

while not job_complete:
    job_response = media_convert.get_job(Id=response["Job"]["Id"])
    
    job_status = job_response['Job']['Status']
    print(f"MediaConvert job status: {job_status}")
    
    if job_status == 'COMPLETE':
        print("Job is complete!")
        job_complete = True
    elif job_status == 'ERROR':
        print("Job has failed.")
        job_complete = True
    else:
        time.sleep(10)

Finally, you create audio tracks in the output and associate a single audio selector with each output track. In addition, you could also add a subtitle into the final video ouput. You could generate a subtitle for the video summary as follow:

In [None]:
video_summary_subtitle = ""
start = intro_time

def split_long_lines(text, max_line_length):
    words = text.split()
    lines = []
    current_line = []
    current_length = 0

    for word in words:
        if current_length + len(word) + len(current_line) > max_line_length:
            lines.append(" ".join(current_line))
            current_line = []
            current_length = 0
        current_line.append(word)
        current_length += len(word) + 1

    if current_line:
        lines.append(" ".join(current_line))

    return lines

def milliseconds_to_subtitleTimeFormat(ms):
    return "{:02d}:{:02d}:{:02d},{:03d}".format(
        int((ms // 3600000) % 24),  # hours
        int((ms // 60000) % 60),  # minutes
        int((ms // 1000) % 60),  # seconds
        int(ms % 1000),  # milliseconds
    )

for i in range(min(len(original_sentences), len(summarized_sentences))):
    end = start + durations[i]
    video_summary_subtitle += f"{i+1}\n"
    video_summary_subtitle += f"{milliseconds_to_subtitleTimeFormat(start)} --> {milliseconds_to_subtitleTimeFormat(end)}\n"
    sentence_lines = split_long_lines(summarized_sentences[i], 90)
    for line in sentence_lines:
        video_summary_subtitle += f"{line}\n"
    video_summary_subtitle += "\n"
    start = end

In [None]:
print(video_summary_subtitle)

In [None]:
subtitle_filename = os.path.splitext(os.path.basename(video['mp4_file']))[0] + ".srt"
s3_client.put_object(
    Body=video_summary_subtitle, Bucket=s3_bucket, Key=subtitle_filename
)

Finally, you create a MediaConvert job for the final video ouput.

In [None]:
input_video_path = f"s3://{s3_bucket}/{video['mp4_file']}"
audio_file_path = f"s3://{s3_bucket}/{audio_narration_filename}"
subtitle_file_path = f"s3://{s3_bucket}/{subtitle_filename}"
output_video_path = f"s3://{s3_bucket}/"

In the following step, we are using a [AWS Elemental MediaConvert](https://aws.amazon.com/mediaconvert/) job to apply the narrated voice and the subtitles on the original video. The output is written to S3 bucket for downstream consumption. 

In [None]:
response = media_convert.create_job(
    Queue="Default",
    UserMetadata={},
    Role=iam_role,
    Settings={
        "TimecodeConfig": {"Source": "ZEROBASED"},
        "OutputGroups": [
            {
                "Name": "File Group",
                "Outputs": [
                    {
                        "ContainerSettings": {
                            "Container": "MP4",
                            "Mp4Settings": {},
                        },
                        "VideoDescription": {
                            "CodecSettings": {
                                "Codec": "H_264",
                                "H264Settings": {
                                    "MaxBitrate": 40000000,
                                    "RateControlMode": "QVBR",
                                    "SceneChangeDetect": "TRANSITION_DETECTION",
                                },
                            }
                        },
                        "NameModifier": "_summary",
                        "AudioDescriptions": [
                            {
                                "AudioSourceName": "Audio Selector Group 1",
                                "CodecSettings": {
                                    "Codec": "AAC",
                                    "AacSettings": {
                                        "Bitrate": 96000,
                                        "CodingMode": "CODING_MODE_2_0",
                                        "SampleRate": 48000,
                                    },
                                },
                            }
                        ],
                        "CaptionDescriptions": [
                            {
                                "CaptionSelectorName": "Captions Selector 1",
                                "DestinationSettings": {
                                    "DestinationType": "BURN_IN",
                                    "BurninDestinationSettings": {
                                        "BackgroundOpacity": 100,
                                        "FontSize": 18,
                                        "FontColor": "WHITE",
                                        "ApplyFontColor": "ALL_TEXT",
                                        "BackgroundColor": "BLACK",
                                    },
                                },
                            }
                        ],
                    }
                ],
                "OutputGroupSettings": {
                    "Type": "FILE_GROUP_SETTINGS",
                    "FileGroupSettings": {"Destination": output_video_path},
                },
            }
        ],
        "Inputs": [
            {
                "AudioSelectors": {
                    "Audio Selector 1": {
                        "DefaultSelection": "NOT_DEFAULT",
                        "ExternalAudioFileInput": audio_file_path,
                    },
                },
                "AudioSelectorGroups": {
                    "Audio Selector Group 1": {
                        "AudioSelectorNames": ["Audio Selector 1"]
                    }
                },
                "VideoSelector": {},
                "TimecodeSource": "ZEROBASED",
                "CaptionSelectors": {
                    "Captions Selector 1": {
                        "SourceSettings": {
                            "SourceType": "SRT",
                            "FileSourceSettings": {"SourceFile": subtitle_file_path},
                        }
                    }
                },
                "FileInput": input_video_path,
            }
        ],
    },
    AccelerationSettings={"Mode": "DISABLED"},
    StatusUpdateInterval="SECONDS_60",
    Priority=0,
)

job_complete = False

while not job_complete:
    job_response = media_convert.get_job(Id=response["Job"]["Id"])
    
    job_status = job_response['Job']['Status']
    print(f"MediaConvert job status: {job_status}")
    
    if job_status == 'COMPLETE':
        print("Job is complete!")
        job_complete = True
    elif job_status == 'ERROR':
        print("Job has failed.")
        job_complete = True
    else:
        time.sleep(10)

## Short-form video output

In [None]:
video_summary = os.path.splitext(os.path.basename(video['mp4_file']))[0] + "_summary.mp4"
s3_client.download_file(s3_bucket, video_summary, video_summary)
Video(url=video_summary, width=640, height=360, html_attributes="controls muted autoplay")

## Clean up

In [None]:
# s3_client.delete_object(Bucket=s3_bucket, Key=audio_narration_filename)
# s3_client.delete_object(Bucket=s3_bucket, Key=video['mp4_file'])
# s3_client.delete_object(Bucket=s3_bucket, Key=subtitle_filename)
# s3_client.delete_object(Bucket=s3_bucket, Key=video_summary)