# AI 비디오 요약

퍼블리셔와 방송사는 Facebook, Instagram, TikTok과 같은 소셜 미디어 플랫폼에서 짧은 형식의 비디오를 활용하여 새로운 시청자를 유치하고 추가 수익 기회를 창출할 수 있습니다.

그러나 복잡한 콘텐츠 이해, 일관성 유지, 다양한 비디오 유형, 그리고 대량의 비디오를 다룰 때의 확장성 부족과 같은 과제들로 인해 비디오 요약 생성은 수동적이고 시간이 많이 소요되는 프로세스입니다. artificial intelligence (AI)와 machine learning (ML)을 활용한 자동화를 도입하면 자동 콘텐츠 분석, 실시간 처리, 맥락적 적응, 사용자 정의, 그리고 지속적인 AI/ML 시스템 개선을 통해 이 프로세스를 더 실행 가능하고 확장 가능하게 만들 수 있습니다.

### 상위 수준 워크플로우

![video summarization diagram](static/images/video-summarization-diagram.png)

이 실습에서는 각 단계를 분석하고 [Amazon Transcribe](https://aws.amazon.com/pm/transcribe), [Amazon Bedrock](https://aws.amazon.com/bedrock), [Amazon Polly](https://aws.amazon.com/polly/) 및 [AWS Elemental MediaConvert](https://aws.amazon.com/mediaconvert/)와 같은 AWS 네이티브 서비스를 사용하여 비디오 요약을 어떻게 달성할 수 있는지 자세히 보여드릴 것입니다.

## Prerequisites


이 노트북을 실행하려면 노트북 환경을 설정하고 오디오, 시각적, 의미론적 정보를 사용하여 비디오를 세그먼트화한 이전의 모든 기초 노트북을 실행했어야 합니다:
1. [00-prerequisites.ipynb](00-prerequisites.ipyn)
2. [01A-visual-segments-frames-shots-scenes.ipynb](01A-visual-segments-frames-shots-scenes.ipynb) 
3. [01B-audio-segments.ipynb](01B-audio-segments.ipynb) 

<div class="alert alert-block alert-info">
⏳ 이 노트북에는 실행하는 데 5분 이상 걸리는 단계가 있습니다. 이 노트북을 실행하기 위해 위의 노트북 메뉴에서 ⏩ 모든 셀 실행 옵션을 선택하는 것을 권장합니다. "03-video-summarization.ipynb의 커널을 다시 시작하시겠습니까? 모든 변수가 손실됩니다."라는 팝업이 표시됩니다. "다시 시작" 버튼을 클릭하여 계속 진행하세요. 노트북의 나머지 부분이 실행되는 동안 결과를 읽을 수 있습니다.
</div>

### Retrieve saved values from previous notebooks




In [None]:
store -r

### Import python packages

In [None]:
from pathlib import Path
import os
import json
import time
import boto3
from botocore.exceptions import ClientError
import json_repair
from termcolor import colored
from IPython.display import JSON
from IPython.display import Video
from IPython.display import Pretty
from IPython.display import Image as DisplayImage
from lib.frames import VideoFrames
from lib.shots import Shots
from lib.scenes import Scenes
from lib.transcript import Transcript
import base64
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import subprocess
import numpy as np
import re

## transcript에서 비디오 콘텐츠 요약

[Amazon Bedrock](https://aws.amazon.com/bedrock/)와 함께 **Large Language Model (LLM)**을 사용하여 비디오의 콘텐츠를 요약합니다.


In [None]:
bedrock_client = boto3.client(service_name="bedrock-runtime")
accept = "application/json"
content_type = "application/json"

In [None]:
with open(video['transcript'].transcript_file, 'r') as file:
    transcript_file = json.load(file)
transcript = transcript_file['results']['transcripts'][0]['transcript']

# model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"
# model_id = "anthropic.claude-3-haiku-20240307-v1:0"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

prompt = f"""Summarize the key points from the following video content in chronological order:

{transcript} 

\n\nThe summary should only contain information present in the video content. Do not include any new or unrelated information.

Important: Start the summary immediately without any introductory phrases. Begin directly with the first key point."""

body = json.dumps(
    {
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                ],
            }
        ],
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "temperature": 0.25,
        "top_p": 0.9,

    }
)
response = bedrock_client.invoke_model(
    body=body, modelId=model_id, accept=accept, contentType=content_type
)
response_body = json.loads(response["body"].read())
summarized_text = response_body["content"][0]["text"]

페이로드에 정의된 다른 매개변수로 엔드포인트를 호출하여 텍스트 요약에 영향을 줄 수 있습니다. 두 가지 중요한 매개변수는 `top_p`와 `temperature`입니다. `top_p`는 누적 확률을 기반으로 모델이 고려하는 토큰의 범위를 제어하는 데 사용되는 반면, `temperature`는 출력의 무작위성 수준을 제어합니다.

모든 사용 사례에 대한 `top_p`와 `temperature`의 만능 조합은 없지만, 이전 예시에서는 높은 `top_p`와 낮은 `temperature`를 가진 샘플 값을 보여주었습니다. 이는 주요 정보에 초점을 맞춘 요약을 생성하고 원본 텍스트에서 벗어나는 것을 피하면서도 출력을 흥미롭게 유지하기 위해 약간의 창의적 변형을 도입합니다.

Let's check the summarized video content:

In [None]:
summarized_text

## 음성 내레이션을 위한 메타데이터 생성

다음 단계는 요약된 텍스트에서 음성을 생성하기 위해 [Amazon Polly](https://aws.amazon.com/polly/)로 시작합니다. Polly 작업의 출력은 MP3 파일과 [Speech Synthesis Markup Language (SSML)](https://docs.aws.amazon.com/polly/latest/dg/ssml.html)로 마크업된 문서입니다. 이 SSML 파일 내에는 특정 Polly 음성이 발성한 개별 문장의 지속 시간을 설명하는 필수 메타데이터가 캡슐화되어 있습니다. 이 오디오 지속 시간 정보를 통해 비디오 세그먼트의 길이를 정의할 수 있습니다. 이 경우에는 1:1 직접 대응이 사용됩니다.


In [None]:
polly_client = boto3.client("polly")
voice_id = "Matthew"

In [None]:
response = polly_client.synthesize_speech(
    Engine="neural",
    OutputFormat="json",
    Text=summarized_text + " This video is generated by Video Summarization Hub.", 
    TextType="text",
    SpeechMarkTypes=["sentence"],
    VoiceId=voice_id,
)

stream_data = response['AudioStream'].read()
polly_ssml = stream_data.decode('utf-8')

다음은 SSML 형식의 Amazon Polly 합성 음성 출력입니다:

In [None]:
polly_ssml = polly_ssml.split("\n")
polly_ssml


SSML 파일은 비디오 요약 문장과 Amazon Polly가 각 문장을 발성하는 데 걸리는 시간을 나타내는 음성 지속 시간을 모두 제공합니다. 다음 몇 단계에서 이러한 값을 추출하여 합성된 음성을 비디오 타임라인과 정렬할 것입니다.


In [None]:
summarized_sentences = []
speech_durations = []

for i in range(len(polly_ssml) - 1):
    curr = polly_ssml[i]
    next = polly_ssml[i + 1]
    if curr.strip() == "" or next.strip() == "":
        continue
    curr = json.loads(curr)
    next = json.loads(next)
    summarized_sentences.append(curr["value"])
    speech_durations.append(int(next["time"]) - int(curr["time"]))

## 가장 관련성 있는 비디오 shots/scenes 선택

요약된 콘텐츠의 모든 문장과 일치하는 가장 관련성 있는 비디오 프레임 시퀀스를 선택해야 합니다. 따라서 우리는 두 텍스트가 얼마나 유사한지 결정하는 문장 유사도 작업을 수행하기 위해 텍스트 임베딩을 사용합니다.

문장 유사도 모델은 입력 텍스트를 의미론적 정보를 캡처하는 벡터(임베딩)로 변환하고 그들 사이의 근접성이나 유사성을 계산합니다.

이 단계에서는 [Amazon Bedrock](https://aws.amazon.com/bedrock/)와 함께 **Text Embedding Model**을 사용하여 원본 자막과 비디오 요약의 모든 문장에 대한 임베딩을 생성합니다.

먼저, 원본 자막 파일을 가져와서 시작 시간과 종료 시간이 있는 문장으로 분해하기 위한 처리를 수행합니다.

In [None]:
with open(video['transcript'].vtt_file, 'r', encoding='utf-8') as file:
    subtitle = file.read()

if subtitle.startswith("WEBVTT"):
    subtitle = subtitle[len("WEBVTT"):].lstrip()

print(subtitle)

In [None]:
def srt_to_array(s):
    """
    Converts the given transcription in SRT/WEBVTT format to list of sentences and their corresponding timecodes.
    Args:
       s - transcription in SRT/WEBVTT format.
    Returns:
       A list of dictionaries, where each dictionary represents a sentence and its corresponding start time and end time.
    """
    sentences = [line.strip() for line in re.findall(r"\d+\n.*?\n(.*?)\n", s)]

    def get_time(s):
        return re.findall(r"\d{2}:\d{2}:\d{2}.\d{3}", s)

    def time_to_ms(time_str):
        match = re.match(r"(\d+):(\d+):(\d+)[.,](\d+)", time_str)
        h, m, s, ms = match.groups()
        return int(h) * 3600000 + int(m) * 60000 + int(s) * 1000 + int(ms)

    startTimes = get_time(s)[::2]
    endTimes = get_time(s)[1::2]
    startTimes_ms = [time_to_ms(time) for time in startTimes]
    endTimes_ms = [time_to_ms(time) for time in endTimes]

    complete_sentences = []
    complete_startTimes_ms = []
    complete_endTimes_ms = []

    startTime_ms = -1
    endTime_ms = -1
    sentence = ""
    for i in range(len(sentences)):
        if startTime_ms == -1:
            startTime_ms = startTimes_ms[i]
        sentence += " " + sentences[i]
        if (
            sentences[i].endswith(".")
            or sentences[i].endswith("?")
            or sentences[i].endswith("!")
            or i == len(sentences) - 1
        ):
            endTime_ms = endTimes_ms[i]
            complete_sentences.append(sentence)
            complete_startTimes_ms.append(startTime_ms)
            complete_endTimes_ms.append(endTime_ms)
            startTime_ms = -1
            endTime_ms = -1
            sentence = ""
    processed_transcript = []
    for i in range(len(complete_sentences)):
        processed_transcript.append(
            {
                "sentence_startTime": complete_startTimes_ms[i],
                "sentence_endTime": complete_endTimes_ms[i],
                "sentence": complete_sentences[i],
            }
        )
    return processed_transcript

In [None]:
processed_transcript = srt_to_array(subtitle)


비디오의 원본 transcript에서 일부 문장을 시각화해 보겠습니다:

In [None]:
original_sentences = [item['sentence'] for item in processed_transcript]
original_sentences[:10]

다음으로, 원본 자막과 비디오 요약의 모든 문장에 대한 텍스트 임베딩을 생성합니다. 다음 코드는 Amazon Bedrock API를 사용한 텍스트 임베딩이 어떻게 작동하는지 보여주는 예시입니다.


In [None]:
def find_matching_sentences(original_sentences, summarized_sentences):
    """
    Calculates the similarity between the given original sentences and the summarized sentences.
    Args:
       original_sentences - sentences extacted from the original video
       summarized_sentences - sentences extacted from the video summary
    Return:
       best_matching_indices - list of indices indicating which original sentence best matches each summarized sentence
       similarity_matrix - sentences similarity matrix
    
    """
    def np_cosine_similarity(original_embeddings, summarized_embeddings):
        """
        We use `Cosine similarity` to measure similarities between two vectors.
        """
        dot_products = np.dot(summarized_embeddings, original_embeddings.T)
        summarized_norms = np.linalg.norm(summarized_embeddings, axis=1)
        original_norms = np.linalg.norm(original_embeddings, axis=1)
        similarity_matrix = dot_products / summarized_norms[:, None] / original_norms[None, :]
        return similarity_matrix
        
    model_id = "amazon.titan-embed-image-v1"
    accept = "application/json"
    content_type = "application/json"
    original_embeddings = []
    for str in original_sentences:
        body = json.dumps({"inputText": str})
        response = bedrock_client.invoke_model(
            body=body, modelId=model_id, accept=accept, contentType=content_type
        )
        response_body = json.loads(response["body"].read())
        original_embeddings.append(response_body.get("embedding"))
    original_embeddings = np.array(original_embeddings)

    summarized_embeddings = []
    for str in summarized_sentences:
        body = json.dumps({"inputText": str})
        response = bedrock_client.invoke_model(
            body=body, modelId=model_id, accept=accept, contentType=content_type
        )
        response_body = json.loads(response["body"].read())
        summarized_embeddings.append(response_body.get("embedding"))
    summarized_embeddings = np.array(summarized_embeddings)

    similarity_matrix = np_cosine_similarity(original_embeddings, summarized_embeddings)
    best_matching_indices = []
    len_summarized_sentences = len(summarized_sentences)
    len_original_sentences = len(original_sentences)

    # Find the best matching sentences.
    dp = np.zeros([len_summarized_sentences, len_original_sentences], dtype=float)
    for i in range(0, len_summarized_sentences):
        for j in range(0, len_original_sentences):
            if i == 0:
                dp[i][j] = similarity_matrix[i][j]
            else:
                max_score = -1
                for k in range(0, j):
                    if similarity_matrix[i][j] > 0 and dp[i - 1][k] > 0:
                        max_score = max(
                            max_score, similarity_matrix[i][j] + dp[i - 1][k]
                        )
                dp[i][j] = max_score

    j = len_original_sentences

    for i in range(len_summarized_sentences - 1, -1, -1):
        arr = dp[i][:j]
        idx = np.argmax(arr)
        best_matching_indices.append(idx)
        j = idx
    best_matching_indices.reverse()

    return best_matching_indices, similarity_matrix

In [None]:
best_matching_indices, similarity_matrix = find_matching_sentences(original_sentences, summarized_sentences)

이는 다음과 같은 유사도 행렬 결과를 반환할 것입니다:

In [None]:
similarity_matrix

이전 결과를 다음과 같이 해석할 수 있습니다: 행렬의 첫 번째 행은 요약된 콘텐츠의 첫 번째 문장에 해당하며 모든 열은 원본 텍스트의 문장들과의 유사도 점수를 보여줍니다. 유사도 값은 일반적으로 -1과 1 사이이며, 1은 벡터가 동일하거나 매우 유사함을 나타내고, 0은 벡터가 직교(상관관계 없음)하며 유사성이 없음을 나타내고, -1은 벡터가 정반대이거나 매우 다름을 나타냅니다.

유사도 행렬에서, 우리는 요약된 콘텐츠의 각 문장에 대해 상위 k개의 가장 높은 유사도 점수를 식별하여 원본 텍스트의 가장 유사한 문장들과 정렬합니다. 원본 텍스트의 각 문장에는 원본 자막에 저장된 해당 타임스탬프(즉, startTime, endTime)도 있습니다.

각 요약된 문장에 대한 Polly 오디오의 지속 시간과 원본 자막 파일의 타임스탬프를 모두 통합함으로써, 각 요약된 문장에 해당하는 가장 관련성 있는 프레임의 타임스탬프 시퀀스를 선택할 수 있습니다. 요약된 문장에 대해 선택된 각 비디오 세그먼트의 길이는 해당 내레이션 오디오의 길이와 정렬될 것입니다.

In [None]:
def get_timecodes(best_matching_indices, idx, endTimes, duration, timecodes):
    """
    Calculate the best start and end time for each summarized sentence aligned with the timecode from the original sentences
    Args:
      best_matching_indices - the indices from the original sentence that is most similar with the summarized sentences.
      idx - index from the summarized sentences to process
      endTimes - the endtime from the original sentences
      duration - speech duration for the synthesized sentences from the summarized text
      timecodes - timecode used for calculating the best placement for the summarized text within the video.
    Return:
      
    """
    best_matching_idx = best_matching_indices[idx]
    startTime = int(endTimes[best_matching_idx]) - duration
    carry = max(0, timecodes[len(timecodes) - 1][1] - startTime)
    startTime += carry
    endTime = int(endTimes[best_matching_idx]) + carry
    return startTime, endTime

def ms_to_timecode(ms, drop_frame=False):
    """
    Convert milliseconds to SMPTE timecode
    Args:
        ms: milliseconds
        drop_frame: Boolean, True for drop frame, False for non-drop frame
    Return:
        string in HH:MM:SS:FF or HH:MM:SS;FF format
    """
    total_frames = int(ms * (29.97 if drop_frame else 30) / 1000)
    frames = total_frames % 30
    
    total_seconds = total_frames // 30
    seconds = total_seconds % 60
    
    total_minutes = total_seconds // 60
    minutes = total_minutes % 60
    
    hours = total_minutes // 60
    separator = ';' if drop_frame else ':'    
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}{separator}{frames:02d}"

In [None]:
intro_time = float(transcript_file["results"]["items"][0]["start_time"]) * 1000

timecodes = [[0, intro_time]]
for i in range(len(summarized_sentences)):
    startTime, endTime = get_timecodes(
        best_matching_indices,
        i,
        [item['sentence_endTime'] for item in processed_transcript],
        speech_durations[i],
        timecodes,
    )
    timecodes.append([startTime, endTime])
creditTime = endTime + 3500
timecodes.append([endTime, creditTime])
timecodes_text = ""
for timecode in timecodes:
    timecodes_text += (
        ms_to_timecode(timecode[0], True)
        + ","
        + ms_to_timecode(timecode[1], True)
        + "\n"
    )
to_json = lambda s: [
    {"StartTimecode": t1, "EndTimecode": t2}
    for t1, t2 in (line.split(",") for line in s.split("\n") if line.strip())
]
timecodes = to_json(timecodes_text)

다음은 AWS Elemental MediaConvert 입력 클리핑에 사용될 생성된 타임코드입니다:


In [None]:
timecodes

이제 Amazon Polly를 사용하여 비디오 요약에서 MP3 형식의 오디오 내레이션을 생성할 수 있습니다. SSML 호환성을 위해 요약된 텍스트의 특수 문자를 이스케이프하고 인트로 타이밍을 위한 적절한 breaks로 SSML 마크업을 생성하는 것을 잊지 마세요.


In [None]:
escaped_summarized_text = (
        summarized_text.replace("&", "&amp;")
        .replace('"', "&quot;")
        .replace("'", "&apos;")
        .replace("<", "&lt;")
        .replace(">", "&gt;")
    )
ssml = "<speak>\n"
break_time = intro_time

while break_time > 10000:  # maximum break time in Polly is 10s
    ssml += '<break time = "' + str(break_time) + 'ms"/>'
    break_time -= 10000
ssml += '<break time = "' + str(break_time) + 'ms"/>'
ssml += escaped_summarized_text
ssml += "</speak>"

response = polly_client.synthesize_speech(
    Engine="neural",
    OutputFormat="mp3",
    Text=ssml,
    TextType="ssml",
    VoiceId=voice_id,
)

if "AudioStream" in response:
    with response["AudioStream"] as stream:
        audio_narration = stream.read()

In [None]:
ssml

AWS Elemental MediaConvert로 비디오 트랜스코딩 단계를 위해 오디오 내레이션을 Amazon S3 버킷에 업로드합니다.

In [None]:
s3_client = boto3.client("s3")
s3_bucket = session["bucket"]
audio_narration_filename = os.path.splitext(os.path.basename(video['path']))[0] + ".mp3"
s3_client.put_object(
    Body=audio_narration, Bucket=s3_bucket, Key=audio_narration_filename, ContentType="audio/mpeg"
)

## MediaConvert 어셈블리 워크플로우 생성

기본 입력 클리핑을 수행하기 위해 타임코드의 시퀀스를 AWS Elemental MediaConvert 어셈블리 워크플로우를 생성하는 매개변수로 사용합니다.

Amazon Polly의 MP3 오디오와 결합하고 선호하는 배경 음악을 통합할 수 있는 가능성과 함께, 최종적으로 최종 비디오 요약 출력을 달성할 수 있습니다.



원본 비디오 입력에서 어셈블리 워크플로우를 시작해 보겠습니다. 어셈블리 워크플로우는 별도의 편집 소프트웨어 없이 하나 또는 다른 소스에서 출력 자산을 조립하기 위해 기본 입력 클리핑과 스티칭을 수행하는 MediaConvert 작업입니다.

<div class="alert alert-block alert-info">
⏳ 다음 셀들은 AWS Elemental MediaConvert 작업을 시작할 것이며, 완료하는 데 몇 분이 걸릴 수 있습니다. 작업이 완료될 때까지 충분한 시간을 허용해 주세요.
</div>

In [None]:
iam_role = session["MediaConvertRole"]
input_video_path = video["url"]
output_video_path = f"s3://{s3_bucket}/"

In [None]:
video["url"]

In [None]:
t0 = time.time()

media_convert = boto3.client("mediaconvert")
response = media_convert.create_job(
    Queue="Default",
    UserMetadata={},
    Role=iam_role,
    Settings={
        "TimecodeConfig": {"Source": "ZEROBASED"},
        "OutputGroups": [
            {
                "Name": "File Group",
                "Outputs": [
                    {
                        "ContainerSettings": {
                            "Container": "MP4",
                            "Mp4Settings": {},
                        },
                        "VideoDescription": {
                            "CodecSettings": {
                                "Codec": "H_264",
                                "H264Settings": {
                                    "MaxBitrate": 40000000,
                                    "RateControlMode": "QVBR",
                                    "SceneChangeDetect": "TRANSITION_DETECTION",
                                },
                            }
                        }
                    }
                ],
                "OutputGroupSettings": {
                    "Type": "FILE_GROUP_SETTINGS",
                    "FileGroupSettings": {"Destination": output_video_path},
                },
            }
        ],
        "Inputs": [
            {
                "VideoSelector": {},
                "TimecodeSource": "ZEROBASED",
                "FileInput": input_video_path,
                "InputClippings": timecodes,
            }
        ],
    },
    AccelerationSettings={"Mode": "DISABLED"},
    StatusUpdateInterval="SECONDS_60",
    Priority=0,
)

job_complete = False

while not job_complete:
    job_response = media_convert.get_job(Id=response["Job"]["Id"])
    
    job_status = job_response['Job']['Status']
    print(f"MediaConvert job status: {job_status}")
    
    if job_status == 'COMPLETE':
        print("Job is complete!")
        job_complete = True
    elif job_status == 'ERROR':
        print("Job has failed.")
        job_complete = True
    else:
        time.sleep(10)

t1 = time.time()
print(f"\nElapsed time: {round(t1 - t0, 2)}s")

마지막으로, 출력에 오디오 트랙을 생성하고 각 출력 트랙과 단일 오디오 선택기를 연결합니다. 또한 최종 비디오 출력에 자막을 추가할 수도 있습니다. 다음과 같이 비디오 요약에 대한 자막을 생성할 수 있습니다:

In [None]:
video_summary_subtitle = ""
start = intro_time

def split_long_lines(text, max_line_length):
    words = text.split()
    lines = []
    current_line = []
    current_length = 0

    for word in words:
        if current_length + len(word) + len(current_line) > max_line_length:
            lines.append(" ".join(current_line))
            current_line = []
            current_length = 0
        current_line.append(word)
        current_length += len(word) + 1

    if current_line:
        lines.append(" ".join(current_line))

    return lines

def milliseconds_to_subtitleTimeFormat(ms):
    return "{:02d}:{:02d}:{:02d},{:03d}".format(
        int((ms // 3600000) % 24),  # hours
        int((ms // 60000) % 60),  # minutes
        int((ms // 1000) % 60),  # seconds
        int(ms % 1000),  # milliseconds
    )

for i in range(len(summarized_sentences)):
    end = start + speech_durations[i]
    video_summary_subtitle += f"{i+1}\n"
    video_summary_subtitle += f"{milliseconds_to_subtitleTimeFormat(start)} --> {milliseconds_to_subtitleTimeFormat(end)}\n"
    sentence_lines = split_long_lines(summarized_sentences[i], 90)
    for line in sentence_lines:
        video_summary_subtitle += f"{line}\n"
    video_summary_subtitle += "\n"
    start = end

In [None]:
video_summary_subtitle

In [None]:
subtitle_filename = os.path.splitext(os.path.basename(video['path']))[0] + ".srt"
s3_client.put_object(
    Body=video_summary_subtitle, Bucket=s3_bucket, Key=subtitle_filename
)

마지막으로, 최종 비디오 출력을 위한 MediaConvert 작업을 생성합니다.

In [None]:
input_video_path = f"s3://{s3_bucket}/{video['path']}"
audio_file_path = f"s3://{s3_bucket}/{audio_narration_filename}"
subtitle_file_path = f"s3://{s3_bucket}/{subtitle_filename}"
output_video_path = f"s3://{s3_bucket}/"

다음 단계에서는 [AWS Elemental MediaConvert](https://aws.amazon.com/mediaconvert/) 작업을 사용하여 내레이션된 음성과 자막을 원본 비디오에 적용합니다. 출력은 다운스트림 소비를 위해 S3 버킷에 작성됩니다.


In [None]:
t0 = time.time()

response = media_convert.create_job(
    Queue="Default",
    UserMetadata={},
    Role=iam_role,
    Settings={
        "TimecodeConfig": {"Source": "ZEROBASED"},
        "OutputGroups": [
            {
                "Name": "File Group",
                "Outputs": [
                    {
                        "ContainerSettings": {
                            "Container": "MP4",
                            "Mp4Settings": {},
                        },
                        "VideoDescription": {
                            "CodecSettings": {
                                "Codec": "H_264",
                                "H264Settings": {
                                    "MaxBitrate": 40000000,
                                    "RateControlMode": "QVBR",
                                    "SceneChangeDetect": "TRANSITION_DETECTION",
                                },
                            }
                        },
                        "NameModifier": "_summary",
                        "AudioDescriptions": [
                            {
                                "AudioSourceName": "Audio Selector Group 1",
                                "CodecSettings": {
                                    "Codec": "AAC",
                                    "AacSettings": {
                                        "Bitrate": 96000,
                                        "CodingMode": "CODING_MODE_2_0",
                                        "SampleRate": 48000,
                                    },
                                },
                            }
                        ],
                        "CaptionDescriptions": [
                            {
                                "CaptionSelectorName": "Captions Selector 1",
                                "DestinationSettings": {
                                    "DestinationType": "BURN_IN",
                                    "BurninDestinationSettings": {
                                        "BackgroundOpacity": 100,
                                        "FontSize": 18,
                                        "FontColor": "WHITE",
                                        "ApplyFontColor": "ALL_TEXT",
                                        "BackgroundColor": "BLACK",
                                    },
                                },
                            }
                        ],
                    }
                ],
                "OutputGroupSettings": {
                    "Type": "FILE_GROUP_SETTINGS",
                    "FileGroupSettings": {"Destination": output_video_path},
                },
            }
        ],
        "Inputs": [
            {
                "AudioSelectors": {
                    "Audio Selector 1": {
                        "DefaultSelection": "NOT_DEFAULT",
                        "ExternalAudioFileInput": audio_file_path,
                    },
                },
                "AudioSelectorGroups": {
                    "Audio Selector Group 1": {
                        "AudioSelectorNames": ["Audio Selector 1"]
                    }
                },
                "VideoSelector": {},
                "TimecodeSource": "ZEROBASED",
                "CaptionSelectors": {
                    "Captions Selector 1": {
                        "SourceSettings": {
                            "SourceType": "SRT",
                            "FileSourceSettings": {"SourceFile": subtitle_file_path},
                        }
                    }
                },
                "FileInput": input_video_path,
            }
        ],
    },
    AccelerationSettings={"Mode": "DISABLED"},
    StatusUpdateInterval="SECONDS_60",
    Priority=0,
)

job_complete = False

while not job_complete:
    job_response = media_convert.get_job(Id=response["Job"]["Id"])
    
    job_status = job_response['Job']['Status']
    print(f"MediaConvert job status: {job_status}")
    
    if job_status == 'COMPLETE':
        print("Job is complete!")
        job_complete = True
    elif job_status == 'ERROR':
        print("Job has failed.")
        job_complete = True
    else:
        time.sleep(10)

t1 = time.time()
print(f"\nElapsed time: {round(t1 - t0, 2)}s")

## Short-form video output


다음은 우리의 요약 프로세스에서 생성된 최종 비디오 출력입니다. 생성된 내레이션을 시작하기 전에 원본 비디오의 인트로를 보존했습니다.

In [None]:
video_summary = os.path.splitext(os.path.basename(video['path']))[0] + "_summary.mp4"
s3_client.download_file(s3_bucket, video_summary, video_summary)
display(Video(url=video_summary, width=640, height=360, html_attributes="controls muted autoplay"))

## 시각적 및 오디오 이해를 통한 비디오 요약

이전 섹션에서는 다음과 같이 비디오의 transcription만을 기반으로 요약된 비디오를 생성했습니다:
1. 원본 비디오 transcript 추출 및 요약
2. 요약과 원본 transcript를 의미론적으로 비교하여 가장 잘 매칭되는 비디오 세그먼트 찾기
3. 최종 비디오 출력 생성

이제 비디오 시각적 이해와 transcript 분석을 모두 결합하여 더 포괄적인 요약 비디오를 만드는 접근 방식을 향상시켜 보겠습니다.

In [None]:
video_shots = video["shots"].shots

비디오에서 감지된 각 shot을 분석해 보겠습니다. 각 shot에 대해, Amazon Bedrock의 Large Language Model을 사용하여 시각적 콘텐츠의 설명을 생성할 것입니다. 또한 이 shot 세그먼트 동안 말해진 내용의 해당 transcript를 매칭할 것입니다.


In [None]:
MAX_RETRIES = 50
INITIAL_BACKOFF = 5
bedrock_client = boto3.client(service_name="bedrock-runtime")

def invoke_model_with_retry(body, modelId, accept, contentType):
    retries = 0
    backoff = INITIAL_BACKOFF

    while retries < MAX_RETRIES:
        try:
            response = bedrock_client.invoke_model(
                body=body, modelId=modelId, accept=accept, contentType=contentType
            )
            return response
        except ClientError as e:
            error_code = e.response['Error']['Code']
            print(f"Error: {error_code}. Retrying in {backoff} seconds...")
            time.sleep(backoff)
            retries += 1
            backoff += 1
    
    raise Exception("Max retries reached. Unable to invoke model.")

In [None]:
def generate_shot_description(shot):
    """
    Generate a natural language description of a video shot using LLM in Amazon Bedrock
    Args:
        shot - Dictionary containing shot information including:
                - id: unique identifier for the shot
                - start_ms: start time of the shot in milliseconds
                - end_ms: end time of the shot in milliseconds
                - composite_images: visual representation that combine multiple frames from a single shot into one image
              
    Returns:
        response_body - String containing the generated description of the visual content in the shot based on the analyzed frames
    """

    model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"
    
    
    prompt = f"""Provide a concise description of a video shot based on the given frame images. Focus on creating a cohesive narrative of the entire shot rather than describing each frame individually.
        Skip the preamble; go straight into the description.Please translate the output to Korean and write it as complete sentences, without summarizing"""
        
    body = {
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                ],
            }
        ],
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 512,
    }

    with open(f"{shot['composite_images'][0]['file']}", "rb") as image_file:
        file_content = image_file.read()
        base64_image_string = base64.b64encode(file_content).decode()
        body["messages"][0]["content"].append({
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": "image/png",
                "data": base64_image_string,
            },
        })
        
    response = invoke_model_with_retry(
        body=json.dumps(body), modelId=model_id, accept=accept, contentType=content_type
    )
    response_body = json.loads(response["body"].read())
    response_body = response_body["content"][0]["text"]

    return response_body

In [None]:
def add_shot_transcript(shot_startTime, shot_endTime, transcript):
    """
    Extract relevant transcript that corresponds to a specific video shot's time range
    Args:
        shot_startTime - Start time of the shot in milliseconds
        shot_endTime - End time of the shot in milliseconds
        transcript - List of dictionaries containing sentence information including:
                    - sentence_startTime: start time of the sentence
                    - sentence_endTime: end time of the sentence
                    - sentence: the transcript text
                    
    Returns:
        relevant_transcript - String containing concatenated sentences that overlap with the shot's time range by at least 1 second
    """
    relevant_transcript = ""
    for item in transcript:
        if item["sentence_startTime"] >= shot_endTime:
            break
        if item["sentence_endTime"] <= shot_startTime:
            continue
        delta_start = max(item["sentence_startTime"], shot_startTime)
        delta_end = min(item["sentence_endTime"], shot_endTime)
        if delta_end - delta_start >= 1000:
            relevant_transcript += item["sentence"] + "; "
    return relevant_transcript

### Shot 설명 생성

모든 shots의 텍스트 설명을 생성합니다.

<div class="alert alert-block alert-info">
⏳ 호스팅된 워크숍에 대해 설정된 Amazon Bedrock의 매우 낮은 계정 한도로 인해 샘플 비디오의 약 100개 shots에 대한 설명을 생성하는 데 10분이 걸릴 수 있습니다. 속도를 높이기 위해 미리 계산된 shot 설명을 로드할 것입니다. 아래 셀에서 FASTPATH=False를 설정하여 이를 항상 끌 수 있습니다.
</div>

In [None]:
t0 = time.time()

FASTPATH = True

if FASTPATH:
    video["shots"].load_fastpath_results("shots-descriptions.json")
else:
    for counter, shot in enumerate(video['shots'].shots, start=1):
        shot['shot_description'] = generate_shot_description(shot)
        shot['shot_transcript'] = add_shot_transcript(shot['start_ms'], shot['end_ms'], processed_transcript)

    # store shot descriptions so they can be loaded when the notebook is re-executed with FASTPATH=True.
    video["shots"].store_fastpath_results("shots-descriptions.json")

video_shots = video['shots'].shots
t1 = time.time()
print(f"  Elapsed time: {round(t1 - t0, 2)}s")
print(f"  Shots: {len(video_shots)}")

### Shot 설명 시각화

In [None]:
for counter, shot in enumerate(video_shots, start=1):
    
    print(f'\nSHOT {counter}/{len(video_shots)}: from {shot["start_ms"] }ms to {shot["end_ms"] }ms =======\n')
    display(DisplayImage(f"{shot['composite_images'][0]['file']}"))
    print(f'Shot description: {shot["shot_description"]}\n')
    print(f'Shot transcript: {shot["shot_transcript"]}\n')

이제 각 shot에 대한 설명과 transcript가 있으므로, 의미론적 검색 기능을 위해 [Amazon OpenSearch Serverless](https://aws.amazon.com/opensearch-service/features/serverless/) 벡터 데이터베이스에 저장해 보겠습니다.

먼저, OpenSearch 인덱스를 생성할 것입니다.

In [None]:
region = sagemaker_resources["region"]
aoss_host = session["AOSSCollectionEndpoint"]
aoss_index = "video-summarization-index"
text_embedding_model = "amazon.titan-embed-text-v2:0"
text_embedding_dimension = 1024

In [None]:
def create_opensearch_index(host, region, index, len_embedding):
    """
    Create an OpenSearch Serverless index with vector search capabilities
    Args:
        host - OpenSearch domain endpoint URL
        region - AWS region where the OpenSearch domain is hosted
        index - Name of the index to create
        len_embedding - Dimension size of the vector embeddings
    Returns:
        client - Configured OpenSearch client object
    """
    host = host.split("://")[1] if "://" in host else host
    credentials = boto3.Session().get_credentials()
    auth = AWSV4SignerAuth(credentials, region, "aoss")

    client = OpenSearch(
        hosts=[{"host": host, "port": 443}],
        http_auth=auth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        pool_maxsize=20,
    )

    exist = client.indices.exists(index)
    if exist:
        client.indices.delete(index=index)
        print(f"Clean up previous index if it already exists")
        # Wait for a short time to ensure the deletion is processed
        time.sleep(30)
        
    print("Creating index")
    index_body = {
        "mappings": {
            "properties": {
                "video_name": {"type": "text"},
                "shot_id": {"type": "text"},
                "shot_startTime": {"type": "text"},
                "shot_endTime": {"type": "text"},
                "shot_description": {"type": "text"},
                "shot_transcript": {"type": "text"},
                "shot_desc_vector": {
                    "type": "knn_vector",
                    "dimension": len_embedding,
                    "method": {
                        "engine": "nmslib",
                        "space_type": "cosinesimil",
                        "name": "hnsw",
                        "parameters": {"ef_construction": 512, "m": 16},
                    },
                },
                "shot_transcript_vector": {
                    "type": "knn_vector",
                    "dimension": len_embedding,
                    "method": {
                        "engine": "nmslib",
                        "space_type": "cosinesimil",
                        "name": "hnsw",
                        "parameters": {"ef_construction": 512, "m": 16},
                    },
                }
            }
        },
        "settings": {
            "index": {
                "number_of_shards": 2,
                "knn.algo_param": {"ef_search": 512},
                "knn": True,
            }
        },
    }

    response = client.indices.create(index, body=index_body)

    print("Completed!")
    return client

In [None]:
aoss_client = create_opensearch_index(aoss_host, region, aoss_index, text_embedding_dimension)

Amazon Bedrock의 텍스트 임베딩 모델을 사용하여, 이 데이터를 OpenSearch 인덱스에 삽입하기 전에 shot 설명과 transcripts에 대한 텍스트 임베딩을 생성합니다.

<div class="alert alert-block alert-info">
⏳ 다음 셀은 Amazon Bedrock을 사용하여 임베딩을 생성하고 Amazon OpenSearch Serverless 인덱스에 삽입할 것입니다. 이 프로세스는 비디오 shots의 수에 따라 몇 분이 걸릴 수 있습니다. 모든 임베딩이 생성되고 삽입될 때까지 충분한 시간을 허용해 주세요.
</div>

In [None]:
t0 = time.time()

def get_text_embedding(text_embedding_model, text):
    """
    Generate vector embeddings for text using Amazon Bedrock's embedding model
    Args:
        text_embedding_model - Model id of the Bedrock embedding model
        text - Input text to generate embeddings
        
    Returns:
        embedding - Result text's vector embedding
    """
    if not text.strip():
        text = "No transcript"
    body = json.dumps({"inputText": text, "dimensions": 1024, "normalize": True})
    response = bedrock_client.invoke_model(
        body=body, modelId=text_embedding_model, accept=accept, contentType=content_type
    )
    response_body = json.loads(response["body"].read())
    return response_body.get("embedding")
        
print(f"Insert embeddings to AOSS index ...")
for counter, shot in enumerate(video_shots, start=1):
    shot_desc_embedding = get_text_embedding(text_embedding_model, shot["shot_description"])
    shot_transcript_embedding = get_text_embedding(text_embedding_model, shot["shot_transcript"])
    embedding_request_body = json.dumps(
        {
            "video_name": video["path"],
            "shot_id": shot["id"],
            "shot_startTime": shot["start_ms"],
            "shot_endTime": shot["end_ms"],
            "shot_description": shot["shot_description"],
            "shot_transcript": shot["shot_transcript"],
            "shot_desc_vector": shot_desc_embedding,
            "shot_transcript_vector": shot_transcript_embedding
        }
    )

    response = aoss_client.index(
        index=aoss_index,
        body=embedding_request_body,
        params={"timeout": 60},
    )

print("Completed!")

t1 = time.time()
print(f"\nElapsed time: {round(t1 - t0, 2)}s")

비디오 요약의 각 문장에 대해, shots의 설명 임베딩과 shots의 transcription 임베딩을 사용하여 벡터 데이터베이스에서 관련 shots를 검색할 것입니다. 검색 프로세스는 시각적 및 오디오 정보의 중요성의 균형을 맞추기 위해 이러한 임베딩에 다른 가중치를 할당합니다: 관련 shots를 찾는 데 있어 시각적 콘텐츠의 중요성을 강조하는 **shot 맥락적 설명에 대해 75%(또는 3.0 부스트)** 그리고 시각적 설명보다 적은 영향력으로 오디오 콘텐츠가 검색 결과에 기여할 수 있도록 하는 **shot transcript에 대해 25%(또는 1.0 부스트)**.

선택된 shots의 총 지속 시간도 각 문장의 음성 지속 시간과 일치해야 할 것입니다.

하지만 먼저 OpenSearch에 삽입된 데이터가 검색될 준비가 되었는지 확인할 것입니다.

In [None]:
print("Waiting for the recent inserted data to be searchable in OpenSearch...")

while True:
    try:
        result = aoss_client.search(index=aoss_index, body={"query": {"match_all": {}}})
        if result['hits']['total']['value'] == len(video_shots):
            print("\nData is now available for search!")
            break
        else:
            print(".", end="", flush=True)
            time.sleep(10)
    except Exception as e:
        print(".", end="", flush=True)
        time.sleep(10)

In [None]:
def search_by_text(aoss_index, client, user_query):
    """
    Search for relevant video shots using semantic similarity with user's text query
    Args:
        aoss_index - Name of the OpenSearch index
        client - Configured OpenSearch client object
        user_query - Text query from the user
        
    Returns:
        response - List of dictionaries containing matching shots, where each dictionary includes:
                  - shot_id: unique identifier for the shot
                  - shot_startTime: start time of the shot
                  - shot_endTime: end time of the shot
                  - shot_description: visual description of the shot
                  - shot_transcript: transcript text from the shot
                  - score: similarity score of the match
    """
    text_embedding = get_text_embedding(text_embedding_model, user_query)

    aoss_query = {
        "size": 100,
        "query": {
            "bool": {
                "should": [
                    {
                        "script_score": {
                            "query": {"match_all": {}},
                            "script": {
                                "lang": "knn",
                                "source": "knn_score",
                                "params": {
                                    "field": "shot_desc_vector",
                                    "query_value": text_embedding,
                                    "space_type": "cosinesimil",
                                },
                            },
                            "boost": 3.0
                        }
                    },
                    {
                        "script_score": {
                            "query": {"match_all": {}},
                            "script": {
                                "lang": "knn",
                                "source": "knn_score",
                                "params": {
                                    "field": "shot_transcript_vector",
                                    "query_value": text_embedding,
                                    "space_type": "cosinesimil",
                                },
                            },
                            "boost": 1.0
                        }
                    }
                ],
                "minimum_should_match": 1
            }
        },
        "_source": [
            "shot_id",
            "shot_startTime",
            "shot_endTime",
            "shot_description",
            "shot_transcript",
        ],
    }

    response = client.search(body=aoss_query, index=aoss_index)
    hits = response["hits"]["hits"]
    response = []
    for hit in hits:
        if hit["_score"] >= 0:  # Set score threshold
            response.append(
                {
                    "shot_id": hit["_source"]["shot_id"],
                    "shot_startTime": hit["_source"]["shot_startTime"],
                    "shot_endTime": hit["_source"]["shot_endTime"],
                    "shot_description": hit["_source"]["shot_description"],
                    "shot_transcript": hit["_source"]["shot_transcript"],
                    "score": hit["_score"],
                }
            )

    return response

In [None]:
def find_shots(timecodes, sentence, duration):
    """
    Find and select video shots that match a summarized sentence, considering timing constraints
    Args:
        timecodes - List to store selected shot timecodes [[start_time, end_time], ...]
        sentence - Text to search for matching video shots
        duration - Required duration for the shots
    """
    relevant_shots = search_by_text(aoss_index, aoss_client, sentence)
    if duration is None: # intro
        timecodes.append([relevant_shots[0]["shot_startTime"], relevant_shots[0]["shot_endTime"]])
        shot_ids.add(relevant_shots[0]["shot_id"])
        intro_time = relevant_shots[0]["shot_endTime"] - relevant_shots[0]["shot_startTime"]
    else:
        i = 0
        while i < len(relevant_shots) and duration > 0:
            if relevant_shots[i]["shot_id"] in shot_ids:
                i += 1
                continue
            shot_duration = relevant_shots[i]["shot_endTime"] - relevant_shots[i]["shot_startTime"]
            # timecodes.append([relevant_shots[i]["shot_startTime"], relevant_shots[i]["shot_startTime"] + min(shot_duration, duration)])
            timecodes.append([relevant_shots[i]["shot_endTime"] - min(shot_duration, duration), relevant_shots[i]["shot_endTime"]])
            shot_ids.add(relevant_shots[i]["shot_id"])
            duration -= shot_duration
            i += 1
            

In [None]:
shot_ids = set()
timecodes = []

find_shots(timecodes, "Meridian", None) # Intro
for i in range(len(summarized_sentences)):
    find_shots(timecodes, summarized_sentences[i], speech_durations[i])

# creditTime = timecodes[-1][1] + 1000
# timecodes.append([timecodes[-1][1], creditTime])
timecodes_text = ""
for timecode in timecodes:
    timecodes_text += (
        ms_to_timecode(timecode[0], True)
        + ","
        + ms_to_timecode(timecode[1], True)
        + "\n"
    )
intro_time = timecodes[0][1] - timecodes[0][0]
to_json = lambda s: [
    {"StartTimecode": t1, "EndTimecode": t2}
    for t1, t2 in (line.split(",") for line in s.split("\n") if line.strip())
]
timecodes = to_json(timecodes_text)

결과는 우리의 최종 비디오 세그먼트를 정의하는 타임코드 목록입니다.
    
다음은 AWS Elemental MediaConvert 입력 클리핑에 사용될 생성된 타임코드입니다:

In [None]:
timecodes

이제 타임코드가 있으므로, 이전 섹션과 동일한 단계를 따를 수 있습니다:
1. AWS Elemental MediaConvert를 사용하여 입력 클립 생성
2. 최종 짧은 형식의 비디오를 만들기 위해 오디오 내레이션과 자막 삽입

In [None]:
escaped_summarized_text = (
        summarized_text.replace("&", "&amp;")
        .replace('"', "&quot;")
        .replace("'", "&apos;")
        .replace("<", "&lt;")
        .replace(">", "&gt;")
    )
ssml = "<speak>\n"
break_time = intro_time

while break_time > 10000:  # maximum break time in Polly is 10s
    ssml += '<break time = "' + str(break_time) + 'ms"/>'
    break_time -= 10000
ssml += '<break time = "' + str(break_time) + 'ms"/>'
ssml += escaped_summarized_text
ssml += "</speak>"

response = polly_client.synthesize_speech(
    Engine="neural",
    OutputFormat="mp3",
    Text=ssml,
    TextType="ssml",
    VoiceId=voice_id,
)

if "AudioStream" in response:
    with response["AudioStream"] as stream:
        audio_narration = stream.read()

In [None]:
s3_client.put_object(
    Body=audio_narration, Bucket=s3_bucket, Key=audio_narration_filename, ContentType="audio/mpeg"
)

In [None]:
video_summary_subtitle = ""
start = intro_time
for i in range(len(summarized_sentences)):
    end = start + speech_durations[i]
    video_summary_subtitle += f"{i+1}\n"
    video_summary_subtitle += f"{milliseconds_to_subtitleTimeFormat(start)} --> {milliseconds_to_subtitleTimeFormat(end)}\n"
    sentence_lines = split_long_lines(summarized_sentences[i], 90)
    for line in sentence_lines:
        video_summary_subtitle += f"{line}\n"
    video_summary_subtitle += "\n"
    start = end
s3_client.put_object(
    Body=video_summary_subtitle, Bucket=s3_bucket, Key=subtitle_filename
)

<div class="alert alert-block alert-info">
⏳ 다음 셀들은 비디오 입력 클리핑을 처리하고, 이들을 함께 병합하며 오디오와 자막을 추가하기 위해 여러 AWS Elemental MediaConvert 작업을 실행할 것입니다. 이 프로세스는 비디오 클립의 수와 길이에 따라 몇 분, 잠재적으로 10분 이상이 걸릴 수 있습니다. 각 단계(개별 클립 처리, 병합, 오디오/자막 추가)는 MediaConvert 작업이 완료될 때까지 기다려야 합니다. 셀이 진행되는 동안 기다려 주세요.
</div>

In [None]:
def process_single_clip(media_convert, iam_role, input_video_path, output_video_path, timecode, clip_index):
    """
    Create a MediaConvert job to process a single video clip (due to multiple input clipping need be processed in chronological order)
    Args:
        media_convert - MediaConvert client
        iam_role - IAM role ARN for MediaConvert
        input_video_path - S3 path for input video
        output_video_path - S3 path for output video
        timecode - Dictionary containing start and end timecodes for the clip
        clip_index - Index number for the clip
        
    Returns:
        - job_id: MediaConvert job Id
        - clip_output: S3 path of the output clip
    """
    clip_output = f"{output_video_path}{video['output_dir']}_{clip_index}"
    
    response = media_convert.create_job(
        Queue="Default",
        UserMetadata={},
        Role=iam_role,
        Settings={
            "TimecodeConfig": {"Source": "ZEROBASED"},
            "OutputGroups": [
                {
                    "Name": "File Group",
                    "Outputs": [
                        {
                            "ContainerSettings": {
                                "Container": "MP4",
                                "Mp4Settings": {},
                            },
                            "VideoDescription": {
                                "CodecSettings": {
                                    "Codec": "H_264",
                                    "H264Settings": {
                                        "MaxBitrate": 40000000,
                                        "RateControlMode": "QVBR",
                                        "SceneChangeDetect": "TRANSITION_DETECTION",
                                    },
                                }
                            }
                        }
                    ],
                    "OutputGroupSettings": {
                        "Type": "FILE_GROUP_SETTINGS",
                        "FileGroupSettings": {"Destination": clip_output},
                    },
                }
            ],
            "Inputs": [
                {
                    "VideoSelector": {},
                    "TimecodeSource": "ZEROBASED",
                    "FileInput": video["url"],
                    "InputClippings": [timecode],
                }
            ],
        },
        AccelerationSettings={"Mode": "DISABLED"},
        StatusUpdateInterval="SECONDS_60",
        Priority=0,
    )
    
    return response["Job"]["Id"], clip_output

def wait_for_job(media_convert, job_id):
    """
    Wait for a MediaConvert job to complete
    Args:
        media_convert - MediaConvert client
        job_id - MediaConvert job Id
        
    Returns:
        bool - True if job completed successfully, False if error occurred
    """
    job_complete = False
    while not job_complete:
        job_response = media_convert.get_job(Id=job_id)
        job_status = job_response['Job']['Status']
        print(f"MediaConvert job status: {job_status}")
        
        if job_status in ['COMPLETE', 'ERROR']:
            return job_status == 'COMPLETE'
        time.sleep(10)

In [None]:
t0 = time.time()

clip_paths = []
    
for i, timecode in enumerate(timecodes):
    print(f"Processing clip {i+1}/{len(timecodes)}")
    print(timecode)
    
    job_id, clip_output = process_single_clip(
        media_convert, 
        iam_role, 
        input_video_path, 
        output_video_path, 
        timecode, 
        i
    )

    if wait_for_job(media_convert, job_id):
        clip_paths.append(clip_output)
    else:
        print(f"Failed to process clip {i+1}")
        continue

t1 = time.time()
print(f"\nElapsed time: {round(t1 - t0, 2)}s")

In [None]:
def merge_clips(media_convert, iam_role, clip_paths, output_video_path):
    """
    Merge multiple video clips into a single video
    Args:
        media_convert - MediaConvert client
        iam_role - IAM role ARN for MediaConvert
        clip_paths - List of S3 paths of video clips to merge
        output_video_path - S3 path for video output
        
    Returns:
        - job_id: MediaConvert job Id
        - merged_output: S3 path of the video output
    """
    inputs = []
    
    for clip_path in clip_paths:
        inputs.append({
            "VideoSelector": {},
            "TimecodeSource": "ZEROBASED",
            "FileInput": clip_path + ".mp4",
        })

    merged_output = f"{output_video_path}{video['output_dir']}"
    
    response = media_convert.create_job(
        Queue="Default",
        UserMetadata={},
        Role=iam_role,
        Settings={
            "TimecodeConfig": {"Source": "ZEROBASED"},
            "OutputGroups": [
                {
                    "Name": "File Group",
                    "Outputs": [
                        {
                            "ContainerSettings": {
                                "Container": "MP4",
                                "Mp4Settings": {},
                            },
                            "VideoDescription": {
                                "CodecSettings": {
                                    "Codec": "H_264",
                                    "H264Settings": {
                                        "MaxBitrate": 40000000,
                                        "RateControlMode": "QVBR",
                                        "SceneChangeDetect": "TRANSITION_DETECTION",
                                    },
                                }
                            }
                        }
                    ],
                    "OutputGroupSettings": {
                        "Type": "FILE_GROUP_SETTINGS",
                        "FileGroupSettings": {"Destination": merged_output},
                    },
                }
            ],
            "Inputs": inputs,
        },
        AccelerationSettings={"Mode": "DISABLED"},
        StatusUpdateInterval="SECONDS_60",
        Priority=0,
    )
    
    return response["Job"]["Id"], merged_output

def add_audio_subtitles(media_convert, iam_role, input_video_path, audio_file_path, subtitle_file_path, final_output_path):
    """
    Add audio narration and subtitle into the video
    Args:
        media_convert - MediaConvert client
        iam_role - IAM role ARN for MediaConvert
        input_video_path - S3 path for input video
        audio_file_path - S3 path for audio narration file
        subtitle_file_path - S3 path for SRT subtitle file
        final_output_path - S3 path for video output
        
    Returns:
        job_id: MediaConvert job Id
    """
    
    response = media_convert.create_job(
        Queue="Default",
        UserMetadata={},
        Role=iam_role,
        Settings={
            "TimecodeConfig": {"Source": "ZEROBASED"},
            "OutputGroups": [
                {
                    "Name": "File Group",
                    "Outputs": [
                        {
                            "ContainerSettings": {
                                "Container": "MP4",
                                "Mp4Settings": {},
                            },
                            "VideoDescription": {
                                "CodecSettings": {
                                    "Codec": "H_264",
                                    "H264Settings": {
                                        "MaxBitrate": 40000000,
                                        "RateControlMode": "QVBR",
                                        "SceneChangeDetect": "TRANSITION_DETECTION",
                                    },
                                }
                            },
                            "NameModifier": "_summary_v2",
                            "AudioDescriptions": [
                                {
                                    "AudioSourceName": "Audio Selector Group 1",
                                    "CodecSettings": {
                                        "Codec": "AAC",
                                        "AacSettings": {
                                            "Bitrate": 96000,
                                            "CodingMode": "CODING_MODE_2_0",
                                            "SampleRate": 48000,
                                        },
                                    },
                                }
                            ],
                            "CaptionDescriptions": [
                                {
                                    "CaptionSelectorName": "Captions Selector 1",
                                    "DestinationSettings": {
                                        "DestinationType": "BURN_IN",
                                        "BurninDestinationSettings": {
                                            "BackgroundOpacity": 100,
                                            "FontSize": 18,
                                            "FontColor": "WHITE",
                                            "ApplyFontColor": "ALL_TEXT",
                                            "BackgroundColor": "BLACK",
                                        },
                                    },
                                }
                            ],
                        }
                    ],
                    "OutputGroupSettings": {
                        "Type": "FILE_GROUP_SETTINGS",
                        "FileGroupSettings": {"Destination": final_output_path},
                    },
                }
            ],
            "Inputs": [
                {
                    "VideoSelector": {},
                    "TimecodeSource": "ZEROBASED",
                    "FileInput": input_video_path,
                    "AudioSelectors": {
                        "Audio Selector 1": {
                            "DefaultSelection": "NOT_DEFAULT",
                            "ExternalAudioFileInput": audio_file_path,
                        },
                    },
                    "AudioSelectorGroups": {
                        "Audio Selector Group 1": {
                            "AudioSelectorNames": ["Audio Selector 1"]
                        }
                    },
                    "CaptionSelectors": {
                        "Captions Selector 1": {
                            "SourceSettings": {
                                "SourceType": "SRT",
                                "FileSourceSettings": {"SourceFile": subtitle_file_path},
                            }
                        }
                    },
                }
            ],
        },
        AccelerationSettings={"Mode": "DISABLED"},
        StatusUpdateInterval="SECONDS_60",
        Priority=0,
    )
    
    return response["Job"]["Id"]

오디오와 자막으로 모든 클립을 병합합니다.

In [None]:
t0 = time.time()

merge_job_id, merged_output = merge_clips(
    media_convert, 
    iam_role, 
    clip_paths, 
    output_video_path
)

if wait_for_job(media_convert, merge_job_id):
    print("Successfully merged video clips")
else:
    print("Failed to merge video clips")

final_job_id = add_audio_subtitles(
    media_convert,
    iam_role,
    merged_output + ".mp4",
    audio_file_path,
    subtitle_file_path,
    output_video_path
)

if wait_for_job(media_convert, final_job_id):
    print("Successfully created final video with audio and subtitle")
else:
    print("Failed to add audio and subtitle")

t1 = time.time()
print(f"\nElapsed time: {round(t1 - t0, 2)}s")

다음은 시각적 및 오디오 이해를 모두 통합한 우리의 최종 비디오 요약입니다. 차이점을 보기 위해 이전 버전(오디오 내레이션만 기반)과 비교해 봅니다.


In [None]:
video_summary_v2 = os.path.splitext(os.path.basename(video['path']))[0] + "_summary_v2.mp4"
s3_client.download_file(s3_bucket, video_summary_v2, video_summary_v2)
print("Short-form video with video and audio understanding\n")
print("=========================================\n")
display(Video(url=video_summary_v2, width=640, height=360, html_attributes="controls muted autoplay"))

In [None]:
print("Short-form video with audio understanding only\n")
print("=========================================\n")
display(Video(url=video_summary, width=640, height=360, html_attributes="controls muted autoplay"))

## Clean up
이 실습에서 생성한 일부 리소스를 제거하려면 아래 코드의 주석을 해제합니다.

In [None]:
# aoss_client.indices.delete(aoss_index)
# s3_client.delete_object(Bucket=s3_bucket, Key=audio_narration_filename)
# s3_client.delete_object(Bucket=s3_bucket, Key=video['path'])
# s3_client.delete_object(Bucket=s3_bucket, Key=subtitle_filename)
# s3_client.delete_object(Bucket=s3_bucket, Key=video_summary)
# s3_client.delete_object(Bucket=s3_bucket, Key=video_summary_v2)

# 다음은 무엇인가요?

다른 사용 사례를 시도해보거나, 완료했다면 [Additional Resources](09-resources.ipynb) 실습으로 계속 진행할 수 있습니다.