<a href="https://colab.research.google.com/github/AS-AIGC/AS-AIGDMS/blob/main/colab_notebook_AS_AIGDMS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

我們在這段程式碼中，主要是在安裝一些 Python 的套件。這些套件有助於我們處理各種任務，如下載 YouTube 影片、處理音訊檔案，甚至還有一些是來自 GitHub 的開發版本。我們通過 pip 工具（Python 的套件管理工具）來安裝這些套件。

In this block of code, we're essentially installing some Python packages. These packages help us with various tasks like downloading YouTube videos, processing audio files, and even some are the development versions from GitHub. We're using pip, which is a package management tool in Python, to install these packages.

In [None]:
# Install the whisper-timestamped library from GitHub
!pip3 install git+https://github.com/linto-ai/whisper-timestamped

# Install the development version of the pyannote-audio library
!pip install -qq https://github.com/pyannote/pyannote-audio/archive/develop.zip

# Install the Pytube library for downloading YouTube videos
!pip install -q --upgrade pytube

# Install the Pydub library for working with audio files
!pip install -q --upgrade pydub

# Install the pysrt library for handling subtitles
!pip install pysrt

In [None]:
import os
import json
from pytube import YouTube    # library for downloading YouTube videos
from pydub import AudioSegment    # library for working with audio files
import whisper_timestamped
from pyannote.audio import Pipeline
import pysrt

In [3]:
def download_Youtube_video_audio_file(url, output_directory, audio_filename):
  print(f"Download url: {url}")
  yt = YouTube(use_oauth=True, url=url)
  # Get video's audio track
  audio_stream = yt.streams.filter(only_audio=True).first()
  audio_stream = yt.streams.get_audio_only()
  
  # Download it to Audio directory
  audio_stream.download(output_path=output_directory, filename=audio_filename)

In [4]:
def convert_audio_file_to_mp3_format(audio_filepath, export_path):
  # Load the audio file into an AudioSegment object
  print("Load the audio file")
  audio_file = AudioSegment.from_file(audio_filepath)

  # Convert the audio file to MP3 format and save it to the mp3_format
  print("Convert the audio file to MP3 format\n")
  mp3_file = audio_file.export(export_path, format="mp3")

In [5]:
def slice_audio(audio_file, filename, offset):
  # pydub does things in milliseconds
  audio_length = audio_file.duration_seconds
  minutes_duartion = int(audio_length // 60)
  
  one_minutes = 1 * 60 * 1000
  # Set the start and end timestamp
  start = offset * one_minutes
  # The last part is less than one minute
  end = audio_length if start == minutes_duartion else (offset+1) * one_minutes
  sliced_audio = audio_file[start:end]
  sliced_audio.export(filename, format="mp3")

In [7]:
def create_srt_files(audio_filename, languages):
  # Create srt file for original caption
  with open(f'{audio_filename}.srt', 'w') as fp:
    pass
  # Create srt files for multilingual subtitles
  for language in languages:
    with open(f'{audio_filename}_{language}.srt', "w") as fp:
      pass

In [8]:
def assign_speakers(caption_segments, diarization_result):
  speakers = {}
  for turn, track, speaker in diarization_result.itertracks(yield_label=True):
    timestamp = {"start": turn.start, "end": turn.end}
    if speakers.get(speaker):
      speakers[speaker]['timestamp'].append(timestamp)
    else:
      speakers.update({speaker: {'timestamp': [timestamp], "captions": []}})

  for segment in caption_segments:
    speaker_offset = {}
    for speaker, value in speakers.items():
      if len(value['timestamp']) == 0:
        continue
      timestamp = value['timestamp'][0]
      if timestamp['start'] > segment['end']:
        continue
      offset = abs(timestamp['start'] - segment['start'])
      offset += abs(timestamp['end'] - segment['end'])
      speaker_offset.update({speaker: offset})
      
    closest_speaker = sorted(speaker_offset.items(), key=lambda x:x[1])[0][0]
    if len(speakers[closest_speaker]['timestamp']) > 0:
      speakers[closest_speaker]['timestamp'][0]['start'] = segment['end']
    for speaker, value in speakers.items():
      timestamp = value['timestamp'][0]
      if timestamp['end'] < segment['end']:
        value['timestamp'].pop(0)
    speakers[closest_speaker]['captions'].append(segment['text'])

  speaker_captions = {}
  for speaker, value in speakers.items():
    speaker_captions.update({speaker: value['captions']})
  return speaker_captions

In [None]:
youtube_url = "https://www.youtube.com/watch?v="
# The key is the Youtube video's id
youtube_video = {
    "test1": "qeeA40t4MJY"
}
languages = ['chinese (traditional)', 'english', 'japanese']
access_token = "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"
mp3_directory = './mp3/'
audio_directory = "./audio/"
srt_directory = "./"

if not os.path.isdir(mp3_directory):
  os.mkdir(mp3_directory)

# Load the small model of whisper
model = whisper_timestamped.load_model("small")
caption_segments = []
for key, video_id in youtube_video.items():
  audio_filename = "audio_" + key
  # Download the audio file of the Youtube video
  download_Youtube_video_audio_file(youtube_url+video_id, audio_directory, audio_filename)
  
  # Convert the audio file to MP3 format
  audio_filepath = audio_directory + audio_filename
  mp3_filepath = mp3_directory + audio_filename + ".mp3"
  convert_audio_file_to_mp3_format(audio_filepath, mp3_filepath)

  # Load the mp3 file
  mp3_file = AudioSegment.from_file(mp3_filepath, 'mp3')

  # Create srt files
  create_srt_files(audio_filename, languages)

  # Transcribe the audio
  mp3_duration_minutes = int(mp3_file.duration_seconds // 60)
  seconds = round(mp3_file.duration_seconds - mp3_duration_minutes * 60, 2)
  print(f"Duration: {mp3_duration_minutes} minutes {seconds} seconds\n")
  
  # Chunk the audio per minute
  for offset in range(mp3_duration_minutes + 1):
    print(f"Minute {offset}")
    filename = f"{audio_filename}_{offset}_{offset+1}.mp3"
    # Slice the audio
    slice_audio(mp3_file, filename, offset)   
    
    # Transcribe the video segment
    print('Transcribe the chunked video')
    result = model.transcribe(filename)
    for segment in result['segments']:
      caption_segments.append({'start': segment['start'] + offset * 60, 
                   'end': segment['end'] + offset * 60,
                   'text': segment['text']})

    # Write the original caption to a srt file
    srt_writer = whisper_timestamped.utils.get_writer("srt", srt_directory)
    srt_writer(result, filename)

    # Concatenate the caption
    main_srt = f'{audio_filename}.srt'
    sliced_part_srt = f"{audio_filename}_{offset}_{offset+1}.srt"
    concatenate_srt_file(main_srt, sliced_part_srt, offset)

    # Delete the sliced audio file and srt file.    
    os.remove(filename)
    os.remove(sliced_part_srt)
    print('Done\n')
  
  # Diarization
  diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1",
                            use_auth_token=access_token)
  diarization_result = diarization_pipeline(mp3_filepath)
  for turn, track, speaker in diarization_result.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s {speaker}, track:{track}")
  
  # Record speakers' captions, respectively
  speakers_caption = assign_speakers(caption_segments, diarization_result)
  with open('./dirization_result.json', 'w') as fp: 
    json.dump(speakers_caption, fp, ensure_ascii=False, indent=2)