**Disclaimer:** This colab is for experimental purposes with localization and the code here may be obsolete. Check our repository for the latest code: https://professional-services.googlesource.com/solutions/adclip

**Last update:** September 2023

# AdClip Locales

This colab is the template for experimenting AdClip with Localization. Please make a copy of this.

The template been tested with Thai and might need the customization to be applicable with another language.

If there is any customized need for the language you're working on, please reach out to adclip-team@ to discuss.

## How to use

1. modify the Config part
2. Run the cell from top to bottom
3. Horizontal and Vertical videos will be created in colab local file `/content/`

## Additional Resources

Main Resources: [go/adclip](http://go/adclip)

BRD: [go/adclip-brd](http://go/adclip-brd)

PRD: [go/adclip-prd](http://go/adclip-prd)

Experimental Deck: [go/adclip-localization](http://go/adclip-localization)

## Step 1 Set up

### Step 1.1 install dependencies

After installation, please **always restart the runtime** before running other steps

In [None]:
!pip install google-cloud-aiplatform
!pip install google-cloud-speech
!pip install firebase_functions~=0.1.0
!pip install google-cloud-videointelligence
!pip install moviepy
!pip install pytube

### Step 1.2 Imports and Config

In [None]:
#@title Initialize the imports
from firebase_functions import https_fn
from firebase_admin import initialize_app, firestore
from google.cloud import speech, storage
from vertexai.preview.language_models import TextGenerationModel
from google.cloud import videointelligence

import moviepy.editor as mpy
import re
import itertools
import functools
import copy
import math
import requests

In [None]:
#@title Config
# GCP
project_id = "adclip"               # replace with your own project
location = "us-central1"            # always use us-central1 since asia-southeast1 (singapore) is not supported
gcloud_bucket_name = "adclip.appspot.com"

# youtube video
youtube_video_url = "https://youtu.be/KWEiXdb6gVY?si=aSNmCsW4M3feNAVZ"
video_name = "acomu"

# language
language_code = "ja_JP"                     # check the supported language and model here > https://cloud.google.com/speech-to-text/docs/speech-to-text-supported-languages
video_transcript_model = "Default"          # if you use "Default" model, please take a closer look to text and timestamp since some word<>timestamp may not be 100% accurate

In [None]:
#@title Initialize GCP
from google.colab import auth as google_auth

google_auth.authenticate_user(project_id=project_id)
!gcloud config set project {project_id}
!gcloud config get-value project

In [None]:
#@title Initialize Services
storage_client = storage.Client()
bucket = storage_client.get_bucket(gcloud_bucket_name)

initialize_app()

<firebase_admin.App at 0x79333da22530>

## Step 2 Get the video from youtube (Optional)

use the below code to get video from youtube. The video will be downloaded to `/content/{video_name}.mp4` colab local file

Or you can upload your mp4 file into `/content/` directly and run "upload to mp4 adclip bucket" cell only.

In [None]:
#@title download video
from pytube import YouTube

yt = YouTube(youtube_video_url)
yt = yt.streams.filter(progressive=True, file_extension="mp4").first()
file_dir = yt.download(filename=f'{video_name}.mp4')
print(f'file dir: {file_dir}')
file_name = file_dir[9:len(file_dir)]
print(f'file name: {file_name}')
file_name_no_type = file_name[0:len(file_name)-4]
print(f'file name no file type: {file_name_no_type}')

In [None]:
#@title upload mp4 to adclip bucket

# Upload file to the bucket.
def upload_blob(source_file_name, destination_blob_name):
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)

    print(f'File {source_file_name} uploaded to {destination_blob_name}')

destination_blob_name = f'videos/{video_name}.mp4'
source_file_name = file_dir

upload_blob(source_file_name, destination_blob_name)

## Step 3 Extract Audio

In [None]:
def extract_audio(video_full_path, file_name, output_name = None):
    file_name_without_extension = file_name.rsplit('.', 1)[0]
    audio_output_file = (file_name_without_extension if output_name is None else output_name) + '.wav'
    gcs_file_path = 'videos/audio/' + audio_output_file

    if does_file_exist(gcs_file_path):
        print('File {} exists'.format(gcs_file_path))
        return f'gs://{gcloud_bucket_name}/{gcs_file_path}'

    tmp_file_path = '/tmp/' + file_name

    # use video file_path
    blob = bucket.blob(video_full_path)
    blob.download_to_filename(tmp_file_path)

    clip = mpy.VideoFileClip(tmp_file_path)

    audio_output_path = '/tmp/' + audio_output_file
    clip.audio.write_audiofile(audio_output_path)

    upload_blob(audio_output_path, gcs_file_path)
    return f'gs://{gcloud_bucket_name}/{gcs_file_path}'

def does_file_exist(filepath):
  filename = filepath.split('/')[-1]
  blob = bucket.get_blob(filepath)
  return blob is not None

In [None]:
extract_audio(video_full_path=f"videos/{video_name}.mp4", file_name=video_name)

## Step 4 Transcribe the video

In [None]:
# Get overall "pace" or gap of every word in a clip then use twice the value of that to split the clips
def does_file_exist(filepath):
  filename = filepath.split('/')[-1]
  blob = bucket.get_blob(filepath)
  return blob is not None

# Upload file to the bucket.
def upload_blob(source_file_name, destination_blob_name):
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)

    print('File {} uploaded to {}.'.format(
        source_file_name,
        destination_blob_name))

def extract_audio(video_full_path, file_name, output_name = None):
    file_name_without_extension = file_name.rsplit('.', 1)[0]
    audio_output_file = (file_name_without_extension if output_name is None else output_name) + '.wav'
    gcs_file_path = 'videos/audio/' + audio_output_file

    if does_file_exist(gcs_file_path):
        print('File {} exists'.format(gcs_file_path))
        return f'gs://{gcloud_bucket_name}/{gcs_file_path}'

    tmp_file_path = '/tmp/' + file_name

    # use video file_path
    blob = bucket.blob(video_full_path)
    blob.download_to_filename(tmp_file_path)

    clip = mpy.VideoFileClip(tmp_file_path)

    audio_output_path = '/tmp/' + audio_output_file
    clip.audio.write_audiofile(audio_output_path)

    upload_blob(audio_output_path, gcs_file_path)
    return f'gs://{gcloud_bucket_name}/{gcs_file_path}'

# Get overall "pace" or gap of every word in a clip then use twice the value of that to split the clips
def refine_transcript(transcript: list) -> list:
    new_transcript = []

    def generate_transcript_item(words: list) -> dict:
        return {
            "text": ' '.join(list(map(lambda word: word['text'], words))),
            "startTime": words[0]['startTime'],
            "endTime": words[-1]['endTime'],
            "duration": words[-1]['endTime'] - words[0]['startTime'],
            "words": words
        }

    for line in transcript:
        gaps = list(map(lambda clip: clip['gap'], line['words']))
        gaps.pop(0) #remove first gap

        if len(gaps) == 0:
          continue
        average = sum(gaps) / len(gaps)
        words = []
        for index, word in enumerate(line['words']):
            if index > 1 and word['gap'] > average * 2.5:
                new_transcript.append(generate_transcript_item(words))
                words = []

            words.append(word)
        if len(words) > 0:
            new_transcript.append(generate_transcript_item(words))
    return new_transcript

# merge clips under 5seconds
def merge_clips(transcript: list) -> list:
    if len(transcript) == 0:
        return []

    def merge(transcript1, transcript2):
        start_time = transcript1['startTime']
        end_time = max(transcript1['endTime'], transcript2['endTime'])

        return {
            'text': f"{transcript1['text']} {transcript2['text']}",
            'startTime': start_time,
            'endTime': end_time,
            'duration': end_time - start_time,
            'words': transcript1['words'] + transcript2['words']
        }

    output = []
    index = 0
    clip = transcript[index]

    def is_overlapping(transcript1, transcript2):
        return False and (transcript2['words'][0]['startTime'] >=
        transcript1['startTime'] and transcript2['words'][-1]['startTime'] <=
        transcript1['endTime'])


    for index in range(len(transcript)):
        if index < len(transcript) - 1:
            next = transcript[index + 1]
            if (next['endTime'] - clip['startTime'] <= 5 or is_overlapping(
            clip, next)):
              clip = merge(clip, next)
            else:
              output.append(clip)
              clip = transcript[index + 1]
        else:
            output.append(clip)

    return output

def refine_transcript2(file_name: str, video_gcs_uri: str, transcript: list) -> list:
    new_transcript = []
    print('refine_transcript2')
    print(video_gcs_uri)
    def generate_transcript_item(words: list, start_time: float, end_time: float) -> dict:
        return {
            "text": ' '.join(list(map(lambda word: word['text'], words))),
            "startTime": start_time,
            "endTime": end_time,
            "duration": end_time - start_time,
            "words": words
        }

    video_shots = get_video_shots(file_name)
    if video_shots is None:
        video_shots = process_video(video_gcs_uri)
        upload_video_shots(file_name, video_shots)

    video_shots_index = 0
    list_of_words = list(map(lambda line: line['words'], transcript))
    transcript_words = list(itertools.chain.from_iterable(list_of_words))
    print('\\\\\transcript_words////')
    print(transcript_words)
    words = []
    for index, word in enumerate(transcript_words):
        words.append(word)
        while video_shots[video_shots_index]['end_time'] < words[0]['startTime']:
            video_shots_index = video_shots_index + 1
        video_shot = video_shots[video_shots_index]
        if word['endTime'] > video_shot['end_time']:
            start_time = min(words[0]['startTime'], video_shot['start_time'])
            if index < len(transcript_words) - 1:
                end_time = max(word['endTime'], min(video_shot['end_time'],
                transcript_words[index+1]['startTime']))
            else:
                end_time = max(word['endTime'], video_shot['end_time'])
            video_shots_index = video_shots_index + 1
            end_time
            new_transcript.append(generate_transcript_item(words, start_time, end_time))
            words = []

    if len(words) > 0:
        start_time = min(words[0]['startTime'], video_shots[video_shots_index]['start_time'])
        end_time = max(word['endTime'], video_shots[video_shots_index]['end_time'])
        video_shots_index = video_shots_index + 1
        new_transcript.append(generate_transcript_item(words, start_time, end_time))
    return new_transcript

def get_video_shots(file_name: str) -> bool:
    db = firestore.client()
    doc = db.collection('video_shots').document(file_name).get()

    if not doc.exists:
        return None

    return doc.to_dict().get('data')

def upload_video_shots(file_name: str, video_shots: list) -> None:
    db = firestore.client()
    doc_ref = db.collection('video_shots').document(file_name)
    doc_ref.set({"data": video_shots})

def get_transcript(file_name: str) -> bool:
    try:
      db = firestore.client()
      doc = db.collection('transcripts').document(file_name).get()

      if not doc.exists:
        return None

    except:
      return None

    return doc.to_dict().get('original')

def upload_transcript(file_name: str, transcript: list) -> None:
    db = firestore.client()
    doc_ref = db.collection('transcripts').document(file_name)
    doc_ref.set({"original": transcript})

def transcribe_video(video_full_path, file_name, language_code, model) -> any:
    if video_full_path is None:
        return {
            "text": "Missing video uri, sample format: https://googleapis.com/Welcome to World Class.wav"
        }

    transcript_in_firestore = get_transcript(file_name)
    if transcript_in_firestore is not None:
        return {
            "transcript": merge_clips(refine_transcript2(file_name, f'gs://{gcloud_bucket_name}/{video_full_path}', transcript_in_firestore)),
            "original": transcript_in_firestore,
            "v1": refine_transcript(transcript_in_firestore),
        }

    # TODO: extract_audio from video (video_gcs_uri) then upload to GCS
    audio_gcs_uri = extract_audio(video_full_path, file_name)


    print(audio_gcs_uri)
    audio = speech.RecognitionAudio(uri=audio_gcs_uri)

    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        enable_word_time_offsets=True,
        audio_channel_count=2, #2 is default for wav files
        # Enable automatic punctuation
        # enable_automatic_punctuation=True,
        language_code=language_code,
        # alternative_language_codes=alternate_languages,
        model=model,
        # Works only for model="video" or "phone call"
        use_enhanced=True
    )

    operation = client.long_running_recognize(config=config, audio=audio)

    print("Waiting for operation to complete...")
    response = operation.result(timeout=900)

    transcript_builder = []
    last_end_time = 0
    # Each result is for a consecutive portion of the audio. Iterate through
    # them to get the transcripts for the entire audio file.
    for result in response.results:
        # The first alternative is the most likely one for this portion.
        for alternative in result.alternatives:
            print(f'len(words) = {len(alternative.words)}')
            print(f'transcript: {alternative.transcript}')

            if len(alternative.words) > 0:
                transcript_item = {
                    "text": alternative.transcript,
                    "startTime": alternative.words[0].start_time.total_seconds(),
                    "endTime": alternative.words[-1].end_time.total_seconds(),
                    "duration": alternative.words[-1].end_time.total_seconds()
                        - alternative.words[0].start_time.total_seconds()
                }

                transcript_item['words'] = []
                for word in alternative.words:
                    transcript_item['words'].append({
                        "text": word.word,
                        "startTime": word.start_time.total_seconds(),
                        "endTime": word.end_time.total_seconds(),
                        "duration": word.end_time.total_seconds()
                            - word.start_time.total_seconds(),
                        "gap": word.end_time.total_seconds() - last_end_time
                    })
                    last_end_time = word.end_time.total_seconds()
                transcript_builder.append(transcript_item)

    upload_transcript(file_name, transcript_builder)

    return {
        "transcript": merge_clips(refine_transcript2(file_name, f'gs://{gcloud_bucket_name}/{video_full_path}', transcript_builder)),
        "original": transcript_builder,
        "v1": refine_transcript(transcript_builder),
        # "transcript": transcript_builder
    }

In [None]:
video_full_path = f"videos/{video_name}.mp4"

transcript = transcribe_video(video_full_path=video_full_path,
                file_name=video_name,
                language_code=language_code,
                model=video_transcript_model)

transcript

## Step 5 Summarize video transcript

In [None]:
def summarize_transcript(input_transcript, user_prompt, filename, max_duration=40, min_duration=10) -> any:
    print('\\\\\\\\\\summarize_transcript//////////')
    max_duration = float(40)
    min_duration = float(10)

    full_text = '\n'.join([x["text"] for x in input_transcript])
    print('----full_text-----')
    print(full_text)

    list_of_words = list(map(lambda line: line['words'], input_transcript))
    transcript_words = list(itertools.chain.from_iterable(list_of_words))

    video_shots = get_video_shots(filename)

    # 1st attempt to shorten transcript
    shortened_text = send_transcript_to_llm(text=make_prompt(full_text, user_prompt))
    print('----shortened_text----')
    print(shortened_text)

    duration = calculate_duration(shortened_text, transcript_words, video_shots)
    print('----duration----')
    print(duration)

    count = 0
    # Validate duration and loop if condition is not met:
    print(f'max_duration: {max_duration} and min_duration: {min_duration}')
    while count < 3 and (duration > max_duration or duration < min_duration):
        if duration < min_duration:
            print('----shorter than min----')
            shortened_text = send_transcript_to_llm(text=make_prompt(full_text, user_prompt))
        else:
            print('----longer than min----')
            shortened_text = send_transcript_to_llm(text=make_prompt(shortened_text, user_prompt))
        duration = calculate_duration(shortened_text, transcript_words, video_shots)
        count += 1
        print('----LOOP shortened_text----')
        print(shortened_text)
        print('----duration----')
        print(duration)

    upload_summary(full_text, shortened_text)
    segments = get_clips_from_transcript(transcript_words, shortened_text)
    print('----segments----')
    print(segments)

    segments = match_with_video_shots(video_shots, segments, transcript_words)
    print('----segments + video shots----')
    print(segments)

    return  {
        # "summarized_transcript": summarized_transcript
        "summarized_transcript": segments
    }

def send_transcript_to_llm(text, model="text-bison@001",
                    temperature=0,max_output_tokens=300,top_k=40,top_p=0.8):
    print('\\\\\\\\\\send_transcript_to_llm//////////')
    model = TextGenerationModel.from_pretrained(model)
    response = model.predict(text,
        temperature=temperature,
        max_output_tokens=max_output_tokens,
        top_k=top_k,
        top_p=top_p,
    )
    if response.text.lstrip().startswith('Transcript:\n'):
        return response.text.lstrip().replace('Transcript:\n', '', 1)
    print('----LLM Result----')
    print(response.text)
    return response.text

def make_prompt(transcript, user_prompt = ''):
    return f"""You are a senior copy writer for an advertising agency who excels at summarizing transcript for video ads.
        Shorten the transcript by keeping important lines and removing other lines.
        Keep the first and last lines of the transcript.
        Keep the format of the output the same with the input.
        Don't make it too short.
        {user_prompt if type(user_prompt) == 'str' and len(user_prompt) > 0 else ''}

        Transcript:
        {transcript}"""

def calculate_duration(shortened_text: str, transcript: list, video_shots: list) -> float:
    print('\\\\\\\\\\calculate_duration//////////')
    total_duration = 0
    clips = get_clips_from_transcript(transcript, shortened_text)
    clips = match_with_video_shots(video_shots, clips, transcript)
    for clip in clips:
        total_duration += clip.get('duration')
    print(f"----total duration: {total_duration}----")
    return total_duration

def match_with_video_shots(video_shots: str, transcript: list, words: list) -> list:
    print('\\\\\\\\\\match_with_video_shots//////////')
    shot_index = 0
    word_index = 0
    for index, line in enumerate(transcript):
        print(video_shots[shot_index]['end_time'])
        print(line)
        print(line['startTime'])
        while video_shots[shot_index]['end_time'] < line['startTime']:
            shot_index += 1
        video_shot = video_shots[shot_index]

        start_time = min(line['startTime'], video_shot['start_time'])
        transcript[index]['startTime'] = start_time
        print(f'start_time: {start_time}')

        while video_shots[shot_index]['end_time'] < line['endTime']:
            shot_index += 1
        video_shot = video_shots[shot_index]

        while (word_index < len(words) - 1 and words[word_index]['startTime']
              < line['endTime']):
            word_index += 1

        end_time = max(line['endTime'], video_shot['end_time'])

        if words[word_index]['endTime'] != line['endTime']:
            end_time = max(line['endTime'], min(video_shot['end_time'],
                words[word_index]['startTime']))
        print(f'end_time: {end_time}')
        transcript[index]['endTime'] = end_time
        transcript[index]['duration'] = end_time - start_time

    return transcript

def get_video_shots(file_name: str) -> bool:
    print('\\\\\\\\\\get_video_shots//////////')
    db = firestore.client()
    doc = db.collection('video_shots').document(file_name).get()
    print(doc)

    if not doc.exists:
      print('no video shot existed')
      return None

    print(doc.to_dict().get('data'))
    return doc.to_dict().get('data')

def upload_summary(full_text: str, summary: str) -> None:
    print('\\\\\\\\\\upload_summary//////////')
    db = firestore.client()
    doc_ref = db.collection('summary').document()
    doc_ref.set({"full_text": full_text, "summary": summary, "summary_repr": repr(summary)})
    print(doc_ref)

def extract_words_from_str(summary: str) -> list:
    print('\\\\\\\\\\extract_words_from_str//////////')
    print(f'summary: {summary}')
    index = 0
    transcript_ptr = 0
    output = []
    if summary.lstrip().lower().startswith('transcript:'):
        summary = summary.lower().replace('transcript:', '', 1)

    summary = re.sub('[,.?!]', '', summary).lower()
    summary = summary.replace('\n', ' ')

    words = summary.split(' ')
    words = list(filter(lambda word: len(word) > 0, words))
    print(f'words: {words}')
    return words

def get_clips_from_transcript(transcript: list, summary: str) -> list:
    print('\\\\\\\\\\get_clips_from_transcript//////////')
    index = 0
    transcript_ptr = 0
    output = []
    if summary.lstrip().lower().startswith('transcript:'):
        summary = summary.lower().replace('transcript:', '', 1)

    summary = re.sub('[,.?!]', '', summary).lower()
    summary = summary.replace('\n', ' ')

    words = summary.split(' ')
    words = list(filter(lambda word: len(word) > 0, words))

    print(words)
    word_ptr = 0
    while word_ptr < len(words) and transcript_ptr < len(transcript):
        transcript_builder = []

        while (transcript_ptr < len(transcript) and
        word_ptr < len(words) and
        transcript[transcript_ptr].get('text').lower() != words[word_ptr]):
            transcript_ptr = transcript_ptr + 1

        while ((transcript_ptr < len(transcript) and
        word_ptr < len(words) and
        transcript[transcript_ptr].get('text').lower() == words[word_ptr])
        or (transcript_ptr < len(transcript) - 1 and
        word_ptr < len(words) - 1 and
        transcript[transcript_ptr+1].get('text').lower() == words[word_ptr+1])
        or (transcript_ptr < len(transcript) - 2 and
        word_ptr < len(words) - 1 and
        transcript[transcript_ptr+2].get('text').lower() == words[word_ptr+1])):
            transcript_builder.append(transcript[transcript_ptr])
            if transcript[transcript_ptr].get('text').lower() != words[word_ptr]:
                transcript_builder.append(transcript[transcript_ptr+1])
                if transcript[transcript_ptr+1].get('text').lower() != words[word_ptr+1]:
                    transcript_builder.append(transcript[transcript_ptr+2])
                    transcript_ptr += 1

                transcript_ptr += 1
                word_ptr += 1

            transcript_ptr += 1
            word_ptr += 1

        if len(transcript_builder) == 0:
            continue
        if len(transcript_builder) == 1:
            word_ptr = word_ptr - 1
            continue

        new_text = list(map(lambda item: item.get('text'), transcript_builder))
        output.append({
            'text': ' '.join(new_text),
            'startTime': transcript_builder[0].get("startTime"),
            'endTime': transcript_builder[-1].get("endTime"),
            'duration': (transcript_builder[-1].get("endTime") -
                         transcript_builder[0].get("startTime")),
            'words': transcript_builder
        })

    return output

def upload_video_shots(file_name: str, video_shots: list) -> None:
    db = firestore.client()
    doc_ref = db.collection('video_shots').document(file_name)
    doc_ref.set({"data": video_shots})

def process_video(video_gcs_uri: str, output_uri: str = None) -> list:
    video_client = videointelligence.VideoIntelligenceServiceClient()

    features = [
        videointelligence.Feature.SHOT_CHANGE_DETECTION,
        #videointelligence.Feature.SPEECH_TRANSCRIPTION,
    ]

    transcript_config = videointelligence.SpeechTranscriptionConfig(
        language_code="en-US"
    )
    video_context = videointelligence.VideoContext(
        speech_transcription_config=transcript_config)

    operation = video_client.annotate_video(
        request={"features": features,
                "input_uri": video_gcs_uri,}
                # "output_uri": output_uri,
    )

    print("\nProcessing video.", operation)

    result = operation.result(timeout=300)

    print("\n finished processing.")

    video_shots = []
    # first result is retrieved because a single video was processed
    for i, shot in enumerate(result.annotation_results[0].shot_annotations):
        start_time = (
            shot.start_time_offset.seconds + shot.start_time_offset.microseconds / 1e6
        )
        end_time = (
            shot.end_time_offset.seconds + shot.end_time_offset.microseconds / 1e6
        )
        video_shots.append({
            'start_time': math.floor(start_time * 10) / 10.0,
            'end_time': round(end_time, 1),
        })
        print("\tShot {}: {} to {}".format(i, start_time, end_time))

    return video_shots

In [None]:
#@title Test with main function >> summarize_transcript
user_prompt = ''     # modify this user prompt or leave it blank. (same as prompt input from AdClip UI)
summarized_transcript = summarize_transcript(transcript['transcript'], user_prompt, video_name)

segments = summarized_transcript['summarized_transcript']

print(segments)

### If you got an empty transcript from cell above, try this >> Test step by step (one iteration of summarize_transcript)

if the duration is 0, the rest of the code will be failed because it cannot mapped the shorten text back to the original transcript.

<br/>

in Thai language, we use `match_summary_with_original_transcript` instead of `get_clips_from_transcript` to match the original transcript by comparing word by word from original transcript with the LLM Result.

<br/>
<br/>

visualize explanation in [go/adclip-localization](http://go/adclip-localization) slide 24

For example,

The first sentence of original transcript: "เนส กาแฟ เรด คัพ คั่ว อย่าง เชี่ยวชาญ" >> breakdown by space to "เนส", "กาแฟ", "เรด", "คัพ", "คั่ว", "อย่าง", "เชี่ยวชาญ"

LLM Result: "Nescafe Red Cup คั่วอย่างเชี่ยวชาญ" >> breakdown by space to "Nescafe", "Red", "Cup", "คั่วอย่างเชี่ยวชาญ"

<br/>

`match_summary_with_original_transcript` will compare

"Nescafe" with "เนส", "กาแฟ", "เรด", ... => cannot find the match, go to next word

"Red" with "เนส", "กาแฟ", "เรด", ... => cannot find the match, go to next word

"Cup" with with "เนส", "กาแฟ", "เรด", ... => cannot find the match, go to next word

"คั่วอย่างเชี่ยวชาญ" with "เนส", "กาแฟ", "เรด", ..., "คั่ว" => found "คั่ว" in the original transcript => consider matched! add the sentence from original transcript to the summarized transcript

In [None]:
# new function to map LLM summary --> original transcript (not merge to Google Cloud Function yet)
def match_summary_with_original_transcript(transcript_words, original_transcript):
  list_of_words = list(map(lambda line: line['words'], original_transcript))
  transcript_words = list(itertools.chain.from_iterable(list_of_words))
  extracted_words = extract_words_from_str(shorten_transcript)

  words_ptr = 0
  original_ptr = 0
  matched_result = []
  is_matched = False
  latest_added_scene = -1
  while words_ptr < len(extracted_words):
    if original_ptr >= len(original_transcript) and latest_added_scene < 0:
      original_ptr = 0
    print(f"before original_ptr {original_ptr} latest_added_scene {latest_added_scene}")
    if original_ptr >= len(original_transcript) or original_ptr > latest_added_scene and latest_added_scene > 0:
      original_ptr = latest_added_scene
    print(f"after original_ptr {original_ptr} latest_added_scene {latest_added_scene}")
    print(f'checking [{extracted_words[words_ptr]}] with [{original_transcript[original_ptr]["text"]}]')
    while original_ptr < len(original_transcript):
      is_matched = find_the_match(extracted_words[words_ptr], original_transcript[original_ptr]['words'])
      if is_matched:
        if already_in_final_result(matched_result, original_transcript[original_ptr]):
          break
        else:
          to_add_transcript = original_transcript[original_ptr]
          to_add_transcript['index'] = original_ptr
          matched_result.append(to_add_transcript)
          print(f'scene added >> latest_added_scene {latest_added_scene}')
          latest_added_scene = original_ptr
          is_matched = False
        break
      original_ptr += 1
    words_ptr += 1
  return matched_result

def find_the_match(words, original_transcript_word_list):
  for original_words in original_transcript_word_list:
    if original_words['text'] in words:
      print('matched')
      return True
  return False

def already_in_final_result(final_result_list, transcript_to_add):
  if len(final_result_list) == 0:
    return False
  for result in final_result_list:
    if transcript_to_add['text'] == result['text']:
      return True
  return False

In [None]:
print('---------start------------')
user_prompt = ''          # modify this user prompt or leave it blank. (same as prompt input from AdClip UI)

full_text = '\n'.join([x["text"] for x in transcript["transcript"]])
shorten_transcript = send_transcript_to_llm(text=make_prompt(full_text,user_prompt))

list_of_words = list(map(lambda line: line['words'], transcript["transcript"]))
transcript_words = list(itertools.chain.from_iterable(list_of_words))
extracted_words = extract_words_from_str(shorten_transcript)
video_shots = get_video_shots(f"{video_name}.mp4")
if video_shots is None:
  video_shots = process_video(f'gs://{gcloud_bucket_name}/videos/{video_name}.mp4')
  upload_video_shots(file_name, video_shots)

print('----------Calculate Duration----------')
duration = calculate_duration(shorten_transcript, transcript_words, video_shots)
print(f'duration: {duration}')

print('----------LLM result----------')
print(' '.join(extracted_words))

print('----------Map with original transcript----------')
segments = get_clips_from_transcript(transcript_words, shorten_transcript)

# can try to use this instead if duration = 0 or cannot map the transcrip by get_clips_from_transcript
# segments = match_summary_with_original_transcript(transcript_words, transcript["transcript"])

print('---------- mapped result ----------')
for s in segments:
  print(s['text'])

print('----------Map with Video Shots----------')
segments = match_with_video_shots(video_shots, segments, transcript["transcript"])
print(f'segments: {segments}')

print('----------Final Result----------')
for s in segments:
  print(f"{s['startTime']} {s['endTime']}")
  print(f"{s['text']}")

## Step 6 cut the video

In [None]:
from moviepy.video.fx.all import crop

def clip_video(video_path, segments):
    # loading original video
    original_clip = mpy.VideoFileClip(video_path)

    new_clip = {}

    for segment in segments:
        #make sure end_time does not exceed the video duration
        end_time = min(segment['endTime'], original_clip.duration)
        if new_clip:
            new_clip = mpy.concatenate_videoclips([new_clip, original_clip.subclip(segment['startTime'], end_time)])
        else:
            new_clip = original_clip.subclip(segment['startTime'], end_time)

    (w, h) = new_clip.size

    crop_width = h * 9/16
    crop_width = crop_width//2*2

    x1, x2 = (w - crop_width)//2, (w+crop_width)//2
    y1, y2 = 0, h
    cropped_clip = crop(new_clip, x1=x1, y1=y1, x2=x2, y2=y2)

    cropped_clip.write_videofile(f"{video_name}-vertical.mp4",audio_codec="aac")
    new_clip.write_videofile(f"{video_name}-horizontal.mp4",audio_codec="aac") #put in audio_codec so the clip has sound

In [None]:
#@title Cut the video

clip_video(video_path=f"/content/{video_name}.mp4", segments=segments)