<a href="https://colab.research.google.com/github/archei2500/overdubbing_courses/blob/main/automatic_subtitles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

> **Добро пожаловать в интерактивную систему для распознавания речи, перевода и автоматического создания субтитров!**

> Здесь вы можете:
*   распознать речь из видео;
*   создать субтитры на основе распознанной речи (файл SRT);
*   перевести распознанную речь или субтитры на выбранный язык.

> Пожалуйста, следуйте инструкциям.
1.   Для начала перейдите к первому разделу, выберите модель и запустите ячейку. В среду установится всё необходимое для работы.
2.   Перейдите ко второму разделу: загрузите видео в систему любым удобным способом из предложенных.
3.   Затем выберите раздел, название которого совпадает с вашей задачей, и разверните его.

> *Примечание: для выполнения перевода без распознавания речи необязательно запускать первую ячейку. Это касается и объединения в целые предложения сегментов SRT файла (можете обработать уже готовый свой SRT файл).*

> ***Убедитесь в том, что вы подключены к среде с GPU, если выполняете распознавание речи или создание субтитров!!!***

# Запустить перед началом работы

In [None]:
# @title Выберите модель Whisper перед загрузкой:

#!pip install moviepy # посмотрим, может. можно убрать
!pip install -U openai-whisper # установка whisper
!pip install pydub

from moviepy.editor import VideoFileClip
from IPython.display import clear_output # для очистки выходных данных
from pydub import AudioSegment
import os
from datetime import timedelta
from google.colab import files
import whisper
from tensorflow.test import gpu_device_name


def str_to_time(s):
  return int(s[6:8]) + 60 * int(s[3:5]) + 3600 * int(s[:2]) + float('0.' + s[9:12])


def time_to_str(time):
  hours = int(time // 3600)
  minutes = int((time - 3600 * hours) // 60)
  seconds = time - 60 * minutes
  str_sec = str(round(seconds, 3)).replace('.', ',')
  return str(hours).zfill(2) + ':' + str(minutes).zfill(2) + ':' + str_sec[:str_sec.find(',')].zfill(2) + ',' + str_sec[str_sec.find(',') + 1:].ljust(3, '0')


def write_result_to_file(res, path_to_text):
  file = open(path_to_text, 'w')
  text_massive = []
  segments = res['segments']
  for segment in segments:
    fract1 = str(round(segment['start'] % 1, 3))
    fract2 = str(round(segment['end'] % 1, 3))
    start_time = str(0) + str(timedelta(seconds = int(segment['start']))) + ',' + fract1[fract1.find('.') + 1:].ljust(3, '0')
    end_time = str(0) + str(timedelta(seconds = int(segment['end']))) + ',' + fract2[fract2.find('.') + 1:].ljust(3, '0')
    text = segment['text']
    id = segment['id']
    seg = f"{id + 1}\n{start_time} --> {end_time}\n{text[1:] if text and text[0] == ' ' else 'EMPTY' if text == '' else text}"
    text_massive.append(seg)
  file.write(text_massive[0] + '\n')
  for txt_str in text_massive[1:-1]:
    file.write('\n' + txt_str + '\n')
  file.write('\n' + text_massive[-1])
  file.close()


def write_words_to_file(res, path_to_text):
  file = open(path_to_text, 'w')
  text_massive = []
  segments = res['segments']
  index_shift = 0
  for segment in segments:
    words = segment['words']
    for idx, word in enumerate(words):
      fract1 = str(round(word['start'] % 1, 3))
      fract2 = str(round(word['end'] % 1, 3))
      start_time = str(0) + str(timedelta(seconds = int(word['start']))) + ',' + fract1[fract1.find('.') + 1:].ljust(3, '0')
      end_time = str(0) + str(timedelta(seconds = int(word['end']))) + ',' + fract1[fract1.find('.') + 1:].ljust(3, '0')
      text = word['word']
      seg = f"{index_shift + idx + 1}\n{start_time} --> {end_time}\n{text[1:] if text and text[0] == ' ' else 'EMPTY' if text == '' else text}"
      text_massive.append(seg)
    index_shift += idx
  file.write(text_massive[0] + '\n')
  for txt_str in text_massive[1:-1]:
    file.write('\n' + txt_str + '\n')
  file.write('\n' + text_massive[-1])
  file.close()


def check_punctuation_percent(path, lang, subtitles):
  punct = '.:!?,-—'
  ok = True
  punct_symbols = 0
  other_symbols = 0

  if not subtitles:
    with open(path, 'r') as txtfile:
      text = txtfile.read()
    for char in text:
      if char in punct:
        punct_symbols += 1
      else:
        other_symbols += 1
  else:
    with open(path, 'r') as txtfile:
      lines = [''] + txtfile.read().split('\n')
    for i in range(3, len(lines), 4):
      for char in lines[i]:
        if char in punct:
          punct_symbols += 1
        else:
          other_symbols += 1

  percent = punct_symbols / other_symbols * 100
  if lang == 'en':
    if percent < 1.3:
      ok = False
  elif lang == 'ru':
    if percent < 2:
      ok = False
  return ok


# проверка того, что пользователь подключен к среде GPU
device_name = gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU не найден! Совет: подключитесь к среде с GPU.')

model_name = "large-v2" #@param ["tiny", "base", "small", "medium", "large-v2", "large-v3"]
# загрузка модели Whisper
model = whisper.load_model(model_name)

# переменные для регулирования процесса загрузки видео
yt_downloaded = False
gd_mounted = False

path_to_video = '/content/vid.mp4'

!rm rf /content/sample_data # удаление ненужной папки, которая автоматически создаётся в файловой системе

clear_output()

# Загрузка видео

In [None]:
# @title Выберите метод загрузки и укажите путь к файлу, если привязываете Google Диск:
#@markdown ### Способ загрузки
upload_method = "С устройства" #@param ["С устройства", "По ссылке с YouTube", "Путь к файлу Google Drive"]
#@markdown <font color="orange"> Примечания и советы:

#@markdown <font color="orange"> ``1. При выборе загрузки с устройства нужно будет нажать на кнопку в выходных данных и выбрать файл с устройства.``

#@markdown <font color="orange"> ``2. Для загрузки с YouTube укажите ссылку в соответствующее поле.``

#@markdown <font color="orange"> ``3. Обратите внимание: при выборе способа, связанного с Google Диском, система попросит вас дать разрешение на доступ к вашему Google Drive. Затем он будет смонтирован в файловую систему. После этого укажите путь к файлу в соответствующем поле.``

#@markdown ``Вставьте ссылку на видеоролик с платформы YouTube в требуемом формате `` 👇
youtube_url = 'https://www.youtube.com/watch?v=YOUTUBE_ID' #@param {type:"string"}

#@markdown ``Введите полный путь к видео на вашем Google Диске (для варианта с загрузкой с Google Drive) `` 👇
path_google_drive = '/content/drive/MyDrive/path_to_vid' #@param {type:"string"}

from urllib import parse as urlparse

# Удаление файлов, если такие уже были в ФС прежде
if os.path.isfile(path_to_video):
    os.remove(path_to_video)
if os.path.isfile('/content/aud.wav'):
    os.remove('/content/aud.wav')

path_to_video = '/content/vid.mp4' # путь к видео по умолчанию

# Видеофайл загружается разными способами
if upload_method == "С устройства":
  uploaded = files.upload()
  for filename in uploaded.keys():
    if '/content/' + filename != path_to_video:
      os.rename(filename, path_to_video)  # переименование загруженного файла
elif upload_method == "По ссылке с YouTube":
  if not yt_downloaded:
    !pip install yt-dlp # установка при первом запуске
    yt_downloaded = True
  url_data = urlparse.urlparse(youtube_url)
  query = urlparse.parse_qs(url_data.query)
  YOUTUBE_ID = query["v"][0]
  # Загрузка видео с YouTube
  !yt-dlp -f "bestvideo[ext=mp4]+bestaudio[ext=wav]/best[ext=mp4]/best" --output "/content/vid.%(ext)s" https://www.youtube.com/watch?v=$YOUTUBE_ID
  # Запоминаем путь к файлу
  for file in os.listdir('/content'):
    if file.startswith('vid'):
        path_to_video = os.path.join('/content', file)
else:
  if not gd_mounted:
    from google.colab import drive
    drive.mount("/content/drive") # монтирование Google Диска при первом запуске
    gd_mounted = True
  if not os.path.isfile(path_google_drive): # если файл не найден
      print("ERROR: File not found!")
      raise SystemExit(0)
  !cp $path_google_drive $path_to_video # копирование файла с Google Диска

# извлечение аудиодорожки из видео
video = VideoFileClip(path_to_video)
video.audio.write_audiofile('/content/aud.wav') # извлечение аудиодорожки

clear_output() # очистка выходных данных

# Простое распознавание речи (+ перевод обычного текста)

In [None]:
# @title Распознавание речи

# @markdown Введите подсказку для модели. Например, что это за лекция.
prompt = 'The text below is a lecture on economics.' #@param {type: "string"}

# @markdown Скачать файл с распознанной речью?
download_file = False # @param {type: "boolean"}

if os.path.isfile('/content/aud.wav'):
  result = model.transcribe('/content/aud.wav', initial_prompt = prompt)
  lang = result['language']

  txt_file = open('/content/result.txt', 'w')
  txt_file.write(result['text'])
  txt_file.close()

  clear_output()
  # временно проверяются только тексты на русском и английском
  if lang == 'ru' or lang == 'en':
    ok = check_punctuation_percent('/content/result.txt', lang, False)
    assert ok, f" [!] Вероятно, модель пропустила знаки пунктуации. Рекомендуется перезапустить процесс распознавания."

  print('Файл сохранён по пути /content/result.txt')
  if download_file:
    files.download('/content/result.txt')
else:
  print('Вы не загрузили видео. Вернитесь к предыдущей ячейке.')

In [None]:
# @title Перевод

# @markdown Укажите путь к файлу, который нужно перевести:
path_to_text = '/content/result.txt' # @param {type: "string"}

!pip install -U deep-translator
!pip install iso639 # для кодов языков
from deep_translator import GoogleTranslator
import iso639
import os
from IPython.display import clear_output

#@markdown Введите язык, на который желаете перевести текст из видео
language = "english" #@param {type: "string"}
language = language.lower()
lang_capital = language[0].upper() + language[1 :]

# получение списка языков, поддерживаемых переводчиком
langs_list = GoogleTranslator().get_supported_languages()
if language in langs_list: # если выбранный язык есть в этом списке
  language = iso639.to_iso639_1(lang_capital) # получение кода языка ISO639-1
  path_to_tr_text = '/content/result_' + language + '.txt'
  # создание соответствующего файла для перевода в любом случае
  output_file = open(path_to_tr_text, "w")
  txtfile = open(path_to_text, "r")
  text = txtfile.read()
  # перевод
  translated = GoogleTranslator(source='auto', target = language).translate(text)
  output_file.write(translated)
  txtfile.close()
  output_file.close()
  clear_output()

  print('Файл сохранён по пути ', path_to_tr_text)
  print('Он будет автоматически загружен на ваш компьютер.')
  # загрузка перевода на компьютер
  files.download(path_to_tr_text)
else:
  print("Данный язык не поддерживается переводчиком.")

# Создание субтитров - текстового сопровождения с временными метками (+ их перевод)

In [None]:
# @title Распознавание речи

# @markdown Введите подсказку для модели. Например, что это за лекция. Или желательный формат сегментов.
prompt = 'The text below, consisting of segments ending with punctuation marks, is a lecture on economics.' #@param {type: "string"}
# @markdown Нужны ли временные метки на уровне слов?

# @markdown <font color="orange"> Примечание: ``Это, скорее всего, обеспечит большую точность временных меток в субтитрах.``
word_timestamps = True # @param {type: "boolean"}
# @markdown Скачать файл с распознанной речью?
download_file = False # @param {type: "boolean"}

if os.path.isfile('/content/aud.wav'):
  result = model.transcribe('/content/aud.wav', initial_prompt = prompt, word_timestamps = word_timestamps)
  lang = result['language']

  write_result_to_file(result, '/content/result.srt')
  if word_timestamps:
    path_to_words = '/content/words.txt'
    write_words_to_file(result, path_to_words)
    word_lines = []
    with open(path_to_words, 'r') as wf:
      word_lines = [''] + wf.read().split('\n')

  if lang == 'ru' or lang == 'en':
    ok = check_punctuation_percent('/content/result.srt', lang, True)
    assert ok, f" [!] Вероятно, модель пропустила знаки пунктуации. Рекомендуется перезапустить процесс распознавания."
  print('Файл сохранён по пути /content/result.srt')
  if download_file:
    files.download('/content/result.srt')
else:
  print('Вы не загрузили видео. Пожалуйста, вернитесь к ячейке загрузки видео.')

In [None]:
# @title Объединение сегментов в целые предложения

#  @markdown Введите путь к файлу, содержащий исходные субтитры:
file_path = '/content/result.srt' # @param {type: "string"}
# @markdown Введите максимальную продолжительность получаемой фразы (в секундах):
max_dur = 25 # @param {type: "integer"}
# @markdown Использовать файл со словами для уточнения временных меток?
word_timestamps = True # @param {type: "boolean"}
# @markdown Скачать файл после обработки?
download_file = False # @param {type: "boolean"}

import re


# важные глобальные переменные для нижеприведённых функций
part_1 = 12 # до какого символа первая часть тайминга (не включительно)
part_2 = 17 # с какого символа начинается вторая часть тайминга


# среднее время без использования меток слов
def compromise_time(time1, time2, s1, s2):
  coef = len(s1) / len(s1 + s2)
  return time_to_str(str_to_time(time1) + (str_to_time(time2) - str_to_time(time1)) * coef)


# восстановление временных меток по словам
def words_time(time1, time2, s1, s2, word_lines):
  time1_time = str_to_time(time1)
  time2_time = str_to_time(time2)
  space1 = s1.rfind(' ')
  space2 = s2.find(' ')
  word1 = s1[space1 + 1:] if space1 != -1 else s1
  word2 = s2[:space2] if space2 != -1 else s2
  #print(word1)
  #print(word2)

  new_end = 0
  new_start = 0

  for i in range(0, len(word_lines), 4):
    if i + 1 < len(word_lines):
      if word_lines[i+3] == word1 and word_lines[i+7] == word2:
        # проверка на принадлежность граничных слов данному сегменту
        start_2_word_time = str_to_time(word_lines[i+6][:part_1])
        if start_2_word_time > time1_time:
          end_1_word_time = str_to_time(word_lines[i+2][part_2:])
          # может быть случай, что конец первого слова будет позже начала второго, проверяем
          if end_1_word_time > start_2_word_time:
            start_1_word_time = str_to_time(word_lines[i+2][:part_1])
            new_end = start_1_word_time
          else:
            new_end = end_1_word_time
          new_start = start_2_word_time
          # обрезаем первоначальный массив до следующего сегмента
          words_num = len(s2.split()) # подсчёт количества слов
          word_lines = word_lines[i + (words_num + 1) * 4:]
          break
        else:
          continue

  if not new_end:
    print('Возник исключительный случай. Скорее всего, попалось нестандартное слово!')
    print('Рекомендуется исправить вручную между ', time1, ' и ', time2)
    print('Временные метки заменены нулевыми.')

  return time_to_str(new_end), time_to_str(new_start), word_lines


# функция для обработки сегмента
def proc_segm(new_lines, lines, current_sentence, start_timing, id, start, end, max_dur, ind, i):
  # по времени нормально выходит пред. предл. и новое до точки
  if str_to_time(end) - str_to_time(start_timing) <= max_dur:
    current_sentence += ' ' + lines[i + 3][:ind + 1]
    new_lines.append('\n' + str(id) + '\n' + start_timing + ' --> ' + end + '\n' + current_sentence)
    id += 1
  # иначе нужно отдельно записать пред. предл. и затем то до точки
  else:
    new_lines.append('\n' + str(id) + '\n' + start_timing + ' --> ' + lines[i-2][part_2:] + '\n' + current_sentence)  # добавляем предыдущие части
    new_lines.append('\n' + str(id + 1) + '\n' + lines[i+2][:part_1] + ' --> ' + end + '\n' + lines[i + 3][:ind + 1])
    id += 2
  # берётся начало следующего предложения
  current_sentence = lines[i + 3][ind + 2:]
  start_timing = start

  return new_lines, current_sentence, start_timing, id


# функция обработки всего файла с субтитрами
def process_text(file_path, max_sent_dur, word_times, word_lines):
    with open(file_path, 'r') as txtfile:
        lines = [''] + txtfile.read().split('\n')

    new_lines = []
    current_sentence = ""
    symbols = '.!?'
    start_timing = ''

    id = 1

    for i in range(0, len(lines), 4):
      if i + 1 < len(lines):
        time_match = re.match(r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', lines[i+2])
        text_match = re.match(r'[A-ZА-Я].*[.?!]$', lines[i+3])
        start_match = re.match(r'[A-ZА-Я].*', lines[i+3])
        end_match = re.match(r'.*[.?!].*', lines[i+3]) #[a-zа-я]

        if time_match:
          # первый случай
          if (text_match or start_match) and start_timing == '': # целое предложение (загл. буква действ. это значит)
              current_sentence = lines[i+3]
              if text_match:
                new_lines.append('\n' + str(id) + '\n' + lines[i+2] + '\n' + current_sentence) # просто добавляется
                id += 1
                current_sentence = ''
                start_timing = ''
              else: # если имеет начало, но не имеет окончания
                start_timing = lines[i+2][:part_1] # начало тайминга, ожидается добавление
          elif text_match: # случай, когда, например, с заглавной буквы начинается след. часть предложения, но это не целое предложение. А на конце .?!
            # т. е. это конец предыдущего, но с загл. буквой
            end_timing = lines[i+2][part_2:]
            new_lines.append('\n' + str(id) + '\n' + start_timing + ' --> ' + end_timing + '\n' + current_sentence + ' ' + lines[i+3])
            current_sentence = ''
            start_timing = ''

          # если часть может быть конечной, начинаясь со строчной буквы и имея в составе знак окончания
          elif end_match:
            index = next((lines[i+3].find(char) for char in symbols if char in lines[i+3]), -1) # получаем индекс символа окончания
            end_timing = lines[i+2][part_2:] # конечный тайминг этой фразы
            if lines[i+3][-1] not in symbols: # если фрагмент не оканчивается символом окончания, т. е. начинается новое предложение
              # рассчитываем, когда закончится первое предложение, и когда начнётся второе по времени
              if word_times: # если с метками слов
                new_end, new_start, word_lines = words_time(lines[i+2][:part_1], end_timing, lines[i+3][:index + 1], lines[i+3][index + 2:], word_lines)
                new_lines, current_sentence, start_timing, id = proc_segm(new_lines, lines, current_sentence,
                                                                          start_timing, id, new_start, new_end,
                                                                          max_dur, index, i)
              else: # если просто среднее время на основе длины предложения
                between_timing = compromise_time(lines[i+2][:part_1], end_timing, lines[i+3][:index + 1], lines[i+3][index + 2:])
                new_lines, current_sentence, start_timing, id = proc_segm(new_lines, lines, current_sentence,
                                                                          start_timing, id, between_timing,
                                                                          between_timing, max_dur, index, i)
            else: # фрагмент оканчивается символом завершения
              new_lines.append('\n' + str(id) + '\n' + start_timing + ' --> ' + end_timing + '\n' + current_sentence + ' ' + lines[i + 3])
              id += 1
              current_sentence = ''
              start_timing = ''

          # если часть не имеет ни начала, ни конца, но её желательно добавить к предыдущей
          elif start_timing:
            if str_to_time(lines[i+2][part_2:]) - str_to_time(start_timing) <= max_sent_dur:
              current_sentence += ' ' + lines[i + 3]
            else: # но фраза оказазалась длиннее, чем надо => не можем добавить
              new_lines.append('\n' + str(id) + '\n' + start_timing + ' --> ' + lines[i - 2][part_2:] + '\n' + current_sentence) # добавляем предыдущие части
              id += 1
              # Будем прибавлять к текущему сегменту последующие
              start_timing = lines[i+2][:part_1]
              current_sentence = lines[i+3]

    return new_lines


path_to_words = '/content/words.txt'
word_lines = []
if word_timestamps:
  with open(path_to_words, 'r') as wf:
    word_lines = [''] + wf.read().split('\n')

# Запуск процесса объединения сегментов в целые предложения
new_lines = process_text(file_path, max_dur, word_timestamps, word_lines)

txt_file = open('/content/subtitles.srt', "w")
txt_file.write(new_lines[0][1:] + '\n') # без символа переноса на новую строку
if len(new_lines) > 1:
  for line in new_lines[1:-1]:
      txt_file.write(line + '\n')
  txt_file.write(new_lines[-1])
txt_file.close()

print('Результат сохранён в /content/subtitles.srt')
if download_file:
  files.download('/content/subtitles.srt')

In [None]:
# @title Перевод

# @markdown Укажите путь к файлу, который нужно перевести:
path_to_text = '/content/subtitles.srt' # @param {type: "string"}

!pip install -U deep-translator
!pip install iso639 # для кодов языков
from deep_translator import GoogleTranslator
import iso639
import os
from IPython.display import clear_output

#@markdown Введите язык, на который желаете перевести текст из видео
language = "english" #@param {type: "string"}
language = language.lower()
lang_capital = language[0].upper() + language[1 :]

# получение списка языков, поддерживаемых переводчиком
langs_list = GoogleTranslator().get_supported_languages()
if language in langs_list: # если выбранный язык есть в этом списке
  language = iso639.to_iso639_1(lang_capital) # получение кода языка ISO639-1
  path_to_tr_text = '/content/subtitles_' + language + '.srt'
  # создание соответствующего файла для перевода в любом случае
  output_file = open(path_to_tr_text, "w")
  txtfile = open(path_to_text, "r")
  lines = [''] + txtfile.read().split('\n')
  for i, line in enumerate(lines):
    if i != len(lines) - 1:
      if i % 4 == 3:
        translated = GoogleTranslator(source='auto', target = language).translate(line)
        output_file.write(translated + "\n")
      else:
        if i != 0:
          output_file.write(line + "\n")
    else:
      translated = GoogleTranslator(source='auto', target = language).translate(line)
      output_file.write(translated)
  txtfile.close()
  output_file.close()
  clear_output()

  print('Файл сохранён по пути', path_to_tr_text)
  print('Он будет автоматически загружен на ваш компьютер.')
  files.download(path_to_tr_text)
else:
  print("Данный язык не поддерживается переводчиком.")

# Размонтировать Google Drive

In [None]:
drive.flush_and_unmount()