In [3]:
import os
from faster_whisper import WhisperModel

from srtranslator import SrtFile
from srtranslator.translators.deepl_api import DeeplApi
from srtranslator.translators.deepl_scrap import DeeplTranslator
from srtranslator.translators.translatepy import TranslatePy

import openai

In [None]:
# 기본 함수 및 경로 처리

def save_to_srt(segments, filename):
    with open(filename, 'w', encoding='utf-8') as file:
        for index, segment in enumerate(segments, start=1):
            start_time = format_time(segment.start)
            end_time = format_time(segment.end)
            file.write(f"{index}\n{start_time} --> {end_time}\n{segment.text}\n\n")

def format_time(seconds):
    hours, remainder = divmod(seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    milliseconds = int((seconds - int(seconds)) * 1000)
    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d},{milliseconds:03d}"

# 동영상 파일 경로
video_file_path = "/Users/FELAB/Desktop/yt-dlp/bin/강의1.webm"

# 동영상 파일 이름 가져오기
video_filename = os.path.splitext(os.path.basename(video_file_path))[0]

# SRT 파일 이름 구성
srt_filename = f'{video_filename}.srt'

# 영어 자막 생성

In [None]:
# Run on GPU with FP16
model = WhisperModel(model_size_or_path='/Users/FELAB/Desktop/yt-dlp/whipser', device="cuda", compute_type="float16")
segments, info = model.transcribe(video_file_path, beam_size=5)
print("Detected language '%s' with probability %f" % (info.language, info.language_probability))

# Save segments to an SRT file
save_to_srt(segments, srt_filename)

# 영어 자막을 한글 자막으로 변환

In [54]:
import os
import re
from collections import namedtuple
import openai
import logging
import concurrent.futures

logging.basicConfig(filename='translation.log', level=logging.DEBUG, 
                    format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

# Ensure you replace 'your-api-key' with the actual OpenAI API key
openai.api_key = os.environ["OPENAI_API_KEY"]

Subtitle = namedtuple('Subtitle', ['index', 'start', 'end', 'text'])

# Function to read the SRT file
def read_srt(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        content = file.read()

    subtitles = []
    blocks = re.split(r'\n\n', content)
    for block in blocks:
        lines = block.strip().split('\n')
        if len(lines) >= 3:
            index = int(lines[0].strip())
            start, end = map(str.strip, lines[1].split('-->'))
            text = ' '.join(lines[2:])
            subtitles.append(Subtitle(index, start, end, text))

    return subtitles

# Combining subtitles into paragraphs to reduce the number of API calls
def combine_subtitles(subtitles, max_length):
    paragraphs = []
    paragraph = ""
    times = []

    for subtitle in subtitles:
        if len(paragraph) + len(subtitle.text) < max_length:
            if paragraph:  # if the paragraph is not empty, add a space before appending the next text
                paragraph += " "
            paragraph += subtitle.text
            times.append((subtitle.start, subtitle.end))
        else:
            paragraphs.append((paragraph.strip(), times))
            paragraph = subtitle.text
            times = [(subtitle.start, subtitle.end)]

    if paragraph.strip():  # Adding the last remaining paragraph if it's not empty
        paragraphs.append((paragraph.strip(), times))

    return paragraphs

In [58]:
# Distribute translated texts back to timestamps and save to a new SRT file
def distribute_and_save_translations(translated_paragraphs, filename):
    try:
        with open(filename, 'w', encoding='utf-8') as file:
            index = 1
            for translated_text, times in translated_paragraphs:
                if translated_text is None:
                    logging.warning("Received None as translated text. Skipping...")
                    continue

                sentences = translated_text.split('. ')
                logging.debug(f"Translated sentences: {sentences}")
                logging.debug(f"Timestamps: {times}")

                if len(sentences) != len(times):
                    logging.warning(f"Warning: Mismatch in number of sentences and timestamps for paragraph '{translated_text}'")
                    
                    # Implementing a flexible mechanism to associate timestamps with sentences
                    avg_duration = len(times) // len(sentences)
                    times = [times[i:i + avg_duration] for i in range(0, len(times), avg_duration)]

                for i, sentence in enumerate(sentences):
                    start = times[i][0][0]
                    end = times[i][-1][1]
                    file.write(f"{index}\n")
                    file.write(f"{start} --> {end}\n")
                    file.write(f"{sentence.strip()}.\n\n")
                    index += 1
    except Exception as e:
        logging.error(f"An error occurred while saving the file: {e}")

# Existing translation function
def translate_text(text):
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system",
                "content": "You are a helpful translator. Translate the following English text to Korean."
            },
            {
                "role": "user",
                "content": text
            }
        ],
        temperature=0.3,
        max_tokens=512
    )
    try:
        translated_text = response['choices'][0]['message']['content'].strip()
        return translated_text if translated_text else None
    except Exception as e:
        print(f"Error in translation: {e}")
        return None

def translate_paragraphs_concurrently(paragraphs, max_workers=5):
    translated_paragraphs = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_paragraph = {executor.submit(translate_text, paragraph): (paragraph, times) for paragraph, times in paragraphs}
        for future in concurrent.futures.as_completed(future_to_paragraph):
            paragraph, times = future_to_paragraph[future]
            try:
                translated_text = future.result()
            except Exception as exc:
                logging.error(f"Error in translating paragraph {paragraph[:50]}...: {exc}")
            else:
                translated_paragraphs.append((translated_text, times))
    return translated_paragraphs

In [59]:
# Testing the entire process
if __name__ == "__main__":
    subtitles = read_srt('./강의1.srt')  # Replace with the actual path of your SRT file
    paragraphs = combine_subtitles(subtitles, 500)  # 500 is a placeholder; adjust the max_length as needed
    translated_paragraphs = translate_paragraphs_concurrently(paragraphs, max_workers=32)
    distribute_and_save_translations(translated_paragraphs, 'translated.srt')  # Replace with the desired output file name