# **Videos Transcription and Translation with Faster Whisper and ChatGPT**


[![notebook shield](https://img.shields.io/static/v1?label=&message=Notebook&color=blue&style=for-the-badge&logo=googlecolab&link=https://colab.research.google.com/github/lewangdev/autotranslate/blob/main/autotranslate.ipynb)](https://colab.research.google.com/github/lewangdev/autotranslate/blob/main/autotranslate.ipynb)
[![repository shield](https://img.shields.io/static/v1?label=&message=Repository&color=blue&style=for-the-badge&logo=github&link=https://github.com/lewangdev/autotranslate)](https://github.com/lewangdev/autotranslate)

This Notebook will guide you through the transcription and translation of video using [Faster Whisper](https://github.com/guillaumekln/faster-whisper) and ChatGPT. You'll be able to explore most inference parameters or use the Notebook as-is to store the transcript, translation and video audio in your Google Drive.

In [None]:
#@markdown # **Check GPU type** 🕵️

#@markdown The type of GPU you get assigned in your Colab session defined the speed at which the video will be transcribed.
#@markdown The higher the number of floating point operations per second (FLOPS), the faster the transcription.
#@markdown But even the least powerful GPU available in Colab is able to run any Whisper model.
#@markdown Make sure you've selected `GPU` as hardware accelerator for the Notebook (Runtime &rarr; Change runtime type &rarr; Hardware accelerator).

#@markdown |  GPU   |  GPU RAM   | FP32 teraFLOPS |     Availability   |
#@markdown |:------:|:----------:|:--------------:|:------------------:|
#@markdown |  T4    |    16 GB   |       8.1      |         Free       |
#@markdown | P100   |    16 GB   |      10.6      |      Colab Pro     |
#@markdown | V100   |    16 GB   |      15.7      |  Colab Pro (Rare)  |

#@markdown ---
#@markdown **Factory reset your Notebook's runtime if you want to get assigned a new GPU.**

!nvidia-smi -L

!nvidia-smi

In [None]:
#@markdown # **Install libraries** 🏗️
#@markdown This cell will take a little while to download several libraries.

#@markdown ---

! pip install faster-whisper
! pip install yt-dlp
! pip install openai


import sys
import warnings
from faster_whisper import WhisperModel
from pathlib import Path
import yt_dlp
import subprocess
import torch
import shutil
import numpy as np
from IPython.display import display, Markdown, YouTubeVideo

device = torch.device('cuda:0')
print('Using device:', device, file=sys.stderr)

In [None]:
#@markdown # **Optional:** Save data in Google Drive 💾
#@markdown Enter a Google Drive path and run this cell if you want to store the results inside Google Drive.

# Uncomment to copy generated images to drive, faster than downloading directly from colab in my experience.
from google.colab import drive
drive_mount_path = Path("/") / "content" / "drive"
drive.mount(str(drive_mount_path))
drive_mount_path /= "My Drive"
#@markdown ---
drive_path = "Colab Notebooks/Videos Transcription and Translation" #@param {type:"string"}
#@markdown ---
#@markdown **Run this cell again if you change your Google Drive path.**

drive_whisper_path = drive_mount_path / Path(drive_path.lstrip("/"))
drive_whisper_path.mkdir(parents=True, exist_ok=True)

In [None]:
#@markdown # **Model selection** 🧠

#@markdown As of the first public release, there are 4 pre-trained options to play with:

#@markdown |  Size  | Parameters | English-only model | Multilingual model | Required VRAM | Relative speed |
#@markdown |:------:|:----------:|:------------------:|:------------------:|:-------------:|:--------------:|
#@markdown |  tiny  |    39 M    |     `tiny.en`      |       `tiny`       |     ~0.8 GB     |      ~32x      |
#@markdown |  base  |    74 M    |     `base.en`      |       `base`       |     ~1.0 GB     |      ~16x      |
#@markdown | small  |   244 M    |     `small.en`     |      `small`       |     ~1.4 GB     |      ~6x       |
#@markdown | medium |   769 M    |    `medium.en`     |      `medium`      |     ~2.7 GB     |      ~2x       |
#@markdown | large-v1  |   1550 M   |        N/A         |      `large-v1`       |    ~4.3 GB     |       1x       |
#@markdown | large-v2  |   1550 M   |        N/A         |      `large-v2`       |    ~4.3 GB     |       1x       |

#@markdown ---
model_size = 'large-v2' #@param ['tiny', 'tiny.en', 'base', 'base.en', 'small', 'small.en', 'medium', 'medium.en', 'large-v1', 'large-v2']
device_type = "cuda" #@param {type:"string"} ['cuda', 'cpu']
compute_type = "float16" #@param {type:"string"} ['float16', 'int8_float16', 'int8']
#@markdown ---
#@markdown **Run this cell again if you change the model.**

model = WhisperModel(model_size, device=device_type, compute_type=compute_type)


In [None]:
#@markdown # **Video selection** 📺

#@markdown Enter the URL of the video you want to transcribe, wether you want to save the audio file in your Google Drive, and run the cell.

Type = "Video or playlist URL" #@param ['Video or playlist URL', 'Google Drive']
#@markdown ---
#@markdown #### **Video or playlist URL**
URL = "https://dft3h5i221ap1.cloudfront.net/OpenAI/c2/video/sc-openai-c2-L5-vid6_2.mp4" #@param {type:"string"}
# store_audio = True #@param {type:"boolean"}
#@markdown ---
#@markdown #### **Google Drive video, audio (mp4, wav), or folder containing video and/or audio files**
video_path = "Colab Notebooks/transcription/my_video.mp4" #@param {type:"string"}
#@markdown ---
#@markdown **Run this cell again if you change the video.**

video_path_local_list = []

if Type == "Video or playlist URL":
    
    ydl_opts = {
        'format': 'm4a/bestaudio/best',
        'outtmpl': '%(id)s.%(ext)s',
        # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
        'postprocessors': [{  # Extract audio using ffmpeg
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
        }]
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        error_code = ydl.download([URL])
        list_video_info = [ydl.extract_info(URL, download=False)]
        
    for video_info in list_video_info:
        video_path_local_list.append(Path(f"{video_info['id']}.wav"))

elif Type == "Google Drive":
    # video_path_drive = drive_mount_path / Path(video_path.lstrip("/"))
    video_path = drive_mount_path / Path(video_path.lstrip("/"))
    if video_path.is_dir():
        for video_path_drive in video_path.glob("**/*"):
            if video_path_drive.is_file():
                display(Markdown(f"**{str(video_path_drive)} selected for transcription.**"))
            elif video_path_drive.is_dir():
                display(Markdown(f"**Subfolders not supported.**"))
            else:
                display(Markdown(f"**{str(video_path_drive)} does not exist, skipping.**"))
            video_path_local = Path(".").resolve() / (video_path_drive.name)
            shutil.copy(video_path_drive, video_path_local)
            video_path_local_list.append(video_path_local)
    elif video_path.is_file():
        video_path_local = Path(".").resolve() / (video_path.name)
        shutil.copy(video_path, video_path_local)
        video_path_local_list.append(video_path_local)
        display(Markdown(f"**{str(video_path)} selected for transcription.**"))
    else:
        display(Markdown(f"**{str(video_path)} does not exist.**"))

else:
    raise(TypeError("Please select supported input type."))

for video_path_local in video_path_local_list:
    if video_path_local.suffix == ".mp4":
        video_path_local = video_path_local.with_suffix(".wav")
        result  = subprocess.run(["ffmpeg", "-i", str(video_path_local.with_suffix(".mp4")), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(video_path_local)])


In [None]:
#@markdown # **Run the model** 🚀

#@markdown Run this cell to execute the transcription of the video. This can take a while and very based on the length of the video and the number of parameters of the model selected above.
def seconds_to_time_format(s):
    # Convert seconds to hours, minutes, seconds, and milliseconds
    hours = s // 3600
    s %= 3600
    minutes = s // 60
    s %= 60
    seconds = s // 1
    milliseconds = round((s % 1) * 1000)
    
    # Return the formatted string
    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d},{int(milliseconds):03d}"


#@markdown ## **Parameters** ⚙️

#@markdown ### **Behavior control**
#@markdown #### Language
language = "en" #@param ["auto", "en", "zh", "ja", "fr", "de"] {allow-input: true}
#@markdown #### initial prompt
initial_prompt = "Hello, Let's begin to talk." #@param {type:"string"}
#@markdown ---
#@markdown #### Word-level timestamps
word_level_timestamps = True #@param {type:"boolean"}
#@markdown ---
#@markdown #### VAD filter
vad_filter = False #@param {type:"boolean"}
vad_filter_min_silence_duration_ms = 50 #@param {type:"integer"}
#@markdown ---


segments, info = model.transcribe(str(video_path_local), beam_size=5,
                                  language=None if language == "auto" else language,
                                  initial_prompt=initial_prompt,
                                  word_timestamps=word_level_timestamps, 
                                  vad_filter=vad_filter,
                                  vad_parameters=dict(min_silence_duration_ms=vad_filter_min_silence_duration_ms))

display(Markdown(f"Detected language '{info.language}' with probability {info.language_probability}"))

fragments = []

for segment in segments:
  print(f"[{seconds_to_time_format(segment.start)} --> {seconds_to_time_format(segment.end)}] {segment.text}")
  if word_level_timestamps:
    for word in segment.words:
      ts_start = seconds_to_time_format(word.start)
      ts_end = seconds_to_time_format(word.end)
      #print(f"[{ts_start} --> {ts_end}] {word.word}")
      fragments.append(dict(start=word.start,end=word.end,text=word.word))
  else:
    ts_start = seconds_to_time_format(segment.start)
    ts_end = seconds_to_time_format(segment.end)
    #print(f"[{ts_start} --> {ts_end}] {segment.text}")
    fragments.append(dict(start=segment.start,end=segment.end,text=segment.text))


In [None]:
#@title Merge words/segments to sentences

#@markdown Run this cell to merge words/segments to sentences.
#@markdown ## **Parameters** ⚙️

#@markdown ### **Behavior control**
#@markdown #### Milliseconds gap between_two sentences
max_gap_ms_between_two_sentence = 200 #@param {type:"integer"}

import json

# Merge words/segments to sentences
def merge_fragments(fragments, gap_ms):
  new_fragments = []
  new_fragment = {}
  length = len(fragments)
  for i, fragment in enumerate(fragments):
    start = fragment['start']
    end = fragment['end']
    text = fragment['text']

    if new_fragment.get('start', None) is None:
      new_fragment['start'] = start
    if new_fragment.get('end', None) is None:
      new_fragment['end'] = end
    if new_fragment.get('text', None) is None:
      new_fragment['text'] = ""

    if start - new_fragment['end'] > gap_ms:
      new_fragments.append(new_fragment)
      new_fragment = dict(start=start, end=end, text=text)
      continue

    new_fragment['end'] = end

    delimiter = '' if text.startswith('-') else ' '  
    new_fragment['text'] = f"{new_fragment['text']}{delimiter}{text.lstrip()}"

    # End of a sentence when symbols found: [.?]
    if text.endswith('.') or text.endswith('?') or i == length-1:
      new_fragments.append(new_fragment)
      new_fragment = {}
  return new_fragments


new_fragments = merge_fragments(fragments, max_gap_ms_between_two_sentence/1000.0)

# Save as json file
json_ext_name = ".json"
json_transcript_file_name = video_path_local.stem + json_ext_name
with open(json_transcript_file_name, 'w') as f:
  f.write(json.dumps(new_fragments))
display(Markdown(f"**Transcript SRT file created: {video_path_local.parent / json_transcript_file_name}**"))

# Save as srt
srt_ext_name = ".srt"
srt_transcript_file_name = video_path_local.stem + srt_ext_name
with open(srt_transcript_file_name, 'w') as f:
  for sentence_idx, fragment in enumerate(new_fragments):
    ts_start = seconds_to_time_format(fragment['start'])
    ts_end = seconds_to_time_format(fragment['end'])
    text = fragment['text']
    print(f"[{ts_start} --> {ts_end}] {text}")
    f.write(f"{sentence_idx + 1}\n")
    f.write(f"{ts_start} --> {ts_end}\n")
    f.write(f"{text.strip()}\n\n")

try:
  shutil.copy(video_path_local.parent / srt_transcript_file_name,
            drive_whisper_path / srt_transcript_file_name
  )
  display(Markdown(f"**Transcript SRT file created: {drive_whisper_path / srt_transcript_file_name}**"))
except:
  display(Markdown(f"**Transcript SRT file created: {video_path_local.parent / srt_transcript_file_name}**"))


In [None]:
#@markdown # **Translate**
#@markdown Run this cell to translate subtitles to the language you want.
#@markdown ## **Parameters** ⚙️

#@markdown ### **Behavior control**

#@markdown #### API Type
api_type = "azure" #@param ["azure", "openai"]

#@markdown #### Azure API Config（If you are using `openai`, please leave these fields blank.）
api_base = "https://xxxxxx.openai.azure.com" #@param {type:"string"}
api_version = "2023-05-15" #@param {type:"string"}
deployment_id = "gpt3" #@param {type:"string"}

#@markdown #### API Key and Model Config
api_key = "xxxxx" #@param {type:"string"}
model_name = "gpt-3.5-turbo" #@param ["gpt-3.5-turbo"] {allow-input: true}
temperature = 0 #@param {type:"number"}
#@markdown ---
#@markdown #### Target Language
target_language = "\u7B80\u4F53\u4E2D\u6587" #@param ["\u7B80\u4F53\u4E2D\u6587", "\u7E41\u9AD4\u4E2D\u6587", "\u65E5\u672C\u8A9E"] {allow-input: true}
#@markdown ---
#@markdown #### Retry and Token Chunks
translate_max_retry_times = 10 #@param {type:"integer"}
count_of_sentence_send_once_limit = 5 #@param {type:"integer"}

# This prompt is from https://twitter.com/dotey/status/1665476562219573249
system_prompt = f"""You are a program responsible for translating subtitles. Your task is to translate the subtitles into {target_language}, maintaining a colloquial tone and style, avoiding long sentences, and ignoring verbal tics such as 'so', 'you know', etc.
The input will be a JSON-formatted string array, which should be translated in accordance with the following steps:
Step1: Join the string array to a sentence, then translate it to {target_language};
Step2: Split the translated sentence to a string array, each item of which should correspond to an item in the original input array.
Step3: Verify if the count of items in the output array equals that of the input array and no item is blank. If it doesn't, go back to Step 2 and try again.
  
Respond with a JSON-formatted string array:
"""
import openai
import json

openai.api_key = api_key

if api_type == "azure":
  openai.api_type = "azure"
  openai.api_base = api_base
  openai.api_version = api_version
else:
  deployment_id = None


def translate_by_chatgpt(sentences, max_retry_times=10, deployment_id=None, model_name="gpt-3.5-turbo", temperature=0.7):
  system_msg = dict(role="system", content=system_prompt)
  user_msg_content = json.dumps(sentences)
  user_msg = dict(role="user", content=user_msg_content)
  current_retry_times = 0

  while True:
    try:
      chat_completion = openai.ChatCompletion.create(deployment_id=deployment_id, 
                                                     model=model_name, 
                                                     messages=[system_msg, user_msg],
                                                     temperature=temperature)
      sentences_translated = json.loads(chat_completion.choices[0].message.content)

      if len(sentences_translated) != len(sentences) and current_retry_times < max_retry_times:
        current_retry_times = current_retry_times + 1
        print(f"==Tranlate Retry with {current_retry_times} times, Reason: translated={len(sentences_translated)}, origin={len(sentences)}")
        continue
      
      break
    except:
      if current_retry_times >= max_retry_times:
        break
      current_retry_times = current_retry_times + 1
      print(f"==Tranlate Retry with {current_retry_times} times")
      continue
  return sentences_translated

def translate_fragments(fragments, sentence_send_limit=5):
  system_msg = dict(role="system", content=system_prompt)
  fragments_translated = []

  # Todo: The count of tokens in sentences must be less than Max Tokens API allowed
  length = len(fragments)
  for n in range(0, length, sentence_send_limit):
    fragments_will_be_translated = fragments[n:n+sentence_send_limit]
    sentences_translated = translate_by_chatgpt(list(map(lambda x: x['text'], fragments_will_be_translated)), 
                                                translate_max_retry_times,
                                                deployment_id,
                                                model_name)

    for i, sentence_translated in enumerate(sentences_translated):
      print(f"{seconds_to_time_format(fragments_will_be_translated[i]['start'])} --> {seconds_to_time_format(fragments_will_be_translated[i]['end'])}")
      print("Original  : " + fragments_will_be_translated[i]['text'].lstrip())
      print("Translated: " + sentence_translated)
      print('\n')
      fragments_will_be_translated[i]['text_translated'] = sentence_translated
    
    fragments_translated.extend(fragments_will_be_translated)
  
  return fragments_translated

fragments_translated = translate_fragments(new_fragments, count_of_sentence_send_once_limit)

# Save translation as json file
json_translated_file_name = f"{video_path_local.stem}-translated.json"
with open(json_translated_file_name, 'w') as f:
  f.write(json.dumps(new_fragments))
display(Markdown(f"**Translation JSON file created: {video_path_local.parent / json_translated_file_name}**"))

# Save translation as srt file
srt_translated_file_name = f"{video_path_local.stem}-translated.srt"
with open(srt_translated_file_name, 'w') as f:
  for sentence_idx, fragment in enumerate(fragments_translated):
    ts_start = seconds_to_time_format(fragment['start'])
    ts_end = seconds_to_time_format(fragment['end'])
    text = fragment.get('text', '')
    text_translated = fragment.get('text_translated', '')
    f.write(f"{sentence_idx + 1}\n")
    f.write(f"{ts_start} --> {ts_end}\n")
    f.write(f"{text_translated.strip()}\n")
    f.write(f"{text.strip()}\n\n")

try:
  shutil.copy(video_path_local.parent / srt_translated_file_name,
            drive_whisper_path / srt_translated_file_name
  )
  display(Markdown(f"**Translated SRT file created: {drive_whisper_path / srt_translated_file_name}**"))
except:
  display(Markdown(f"**Translated SRT file created: {video_path_local.parent / srt_translated_file_name}**"))



In [21]:
# 导入所需的库
import edge_tts
import asyncio
import json
import os
from pydub import AudioSegment
import nest_asyncio

# 定义全局变量，决定是否在音频间填充静默
FILL_SILENCE = False
# 定义全局变量，设置处理的字幕数量，-1表示处理所有字幕
NUM_SUBTITLES_TO_PROCESS = -1
# 定义全局变量，设置并发任务的最大数量
MAX_CONCURRENT_TASKS = 5

# 定义函数，获取下一个字幕的开始时间
def get_next_start(audio_file, data):
    # 从文件名中获取当前字幕的开始时间
    current_start = float(audio_file.split(
        "/")[-1].replace("audio_", "").replace(".wav", ""))
    next_start = None
    # 遍历字幕数据，找到下一个开始时间
    for subtitle in data:
        if subtitle['start'] > current_start:
            next_start = subtitle['start']
            break
    return next_start

# 定义异步函数，转换字幕为语音
async def process_subtitle(semaphore, subtitle):
    async with semaphore:  # 使用信号量限制并发任务数量
        # 设置输出目录
        output_dir = "output"
        # 如果输出目录不存在，则创建
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        # 定义输出的音频文件名
        audio_file = os.path.join(
            output_dir, "audio_" + str(subtitle['start']) + ".wav")

        # 如果音频文件已存在，则跳过
        if os.path.exists(audio_file):
            print(f"Skipping {audio_file} because it already exists.")
            return audio_file

        # 使用edge_tts库将字幕文本转换为语音，并保存为音频文件
        communicate = edge_tts.Communicate(
            subtitle['text_translated'], voice='zh-CN-XiaoxiaoNeural', rate='+75%')
        await communicate.save(audio_file)

        # 检查生成的音频文件是否有效
        try:
            AudioSegment.from_file(audio_file)
        except:
            print(f"Invalid audio file generated: {audio_file}")
            return None

    return audio_file  # 退出'with'块时，自动释放信号量

# 定义函数，将所有音频文件连接起来，如果需要，中间可以插入静默
def concatenate_audios(audio_files, data):
    # 创建一个空的音频段
    combined = AudioSegment.empty()
    # 遍历音频文件
    for audio_file in audio_files:
        # 如果音频文件无效，则跳过
        if audio_file is None:
            continue

        # 尝试从音频文件读取音频数据
        try:
            audio = AudioSegment.from_file(audio_file)
        except:
            print(f"Invalid audio file: {audio_file}")
            continue

        # 将音频数据添加到总音频段
        combined += audio
        # 如果需要在音频间填充静默，则添加静默
        if FILL_SILENCE:
            next_start = get_next_start(audio_file, data)
            silence_duration = next_start - len(audio) / 1000.0
            if silence_duration > 0:
                silence = AudioSegment.silent(duration=silence_duration * 1000)
                combined += silence
    return combined

# 定义主函数，处理所有字幕并生成最终音频
async def main():
    # 读取字幕数据文件
    with open('1680627742235148292-translated.json') as f:
        data = json.load(f)
    # 创建一个信号量，限制并发任务的数量
    semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
    # 获取需要处理的字幕数量
    num_subtitles_to_process = len(
        data) if NUM_SUBTITLES_TO_PROCESS == -1 else NUM_SUBTITLES_TO_PROCESS
    # 为每个字幕创建一个任务
    tasks = [process_subtitle(semaphore, subtitle)
             for subtitle in data[:num_subtitles_to_process]]
    # 并发执行所有任务，并获取结果
    audio_files = await asyncio.gather(*tasks)
    # 将所有音频文件连接起来
    combined = concatenate_audios(audio_files, data)
    # 导出最终音频
    combined.export("final_audio.wav", format='wav')

# 使用nest_asyncio库允许嵌套使用asyncio的事件循环
nest_asyncio.apply()

# 获取当前事件循环，并运行主函数
loop = asyncio.get_event_loop()
loop.run_until_complete(main())


Invalid audio file: output/audio_792.56.wav
