# TODO:
- Fix adding furigana. Doesnt seem to be
- Allow resolution change to saved images to save space.
- Pass language to Whisper???
- Change from Whisper to Facebook's fairseq MMC? https://github.com/facebookresearch/fairseq/tree/main/examples/mms

In [1]:
import cv2
import os
from collections import Counter
from tqdm import tqdm
import whisper
import torch
import moviepy.editor as mp
from moviepy.video.io.VideoFileClip import VideoFileClip
import subprocess
from datetime import timedelta
from yattag import Doc
import time


ADD_FURIGANA = True
VID_EXTS = ['.mp4','.avi','.ogv','.mkv','.webm']
AUD_EXTS = ['.mp3']
SUB_EXTS = ['.srt']
WIDTH = 200
src_dir = 'Input'
out_dir = 'Output'
files = [os.path.join(src_dir, f) for f in os.listdir(src_dir)]

if ADD_FURIGANA:
    from furigana.furigana import split_furigana

  from .autonotebook import tqdm as notebook_tqdm
  def backtrace(trace: np.ndarray):


In [2]:
proj_files = [os.path.splitext(f)[0] for f in files if os.path.splitext(f)[-1] in VID_EXTS]

# Add audio files if they weren't extracted from a video with the same name
proj_files += [os.path.splitext(f)[0] for f in files if os.path.splitext(f)[-1] in AUD_EXTS and os.path.splitext(f)[0] not in proj_files]
print(proj_files)

['Input\\2023年6月10日（土）「OK! Cozy up!週末増刊号」']


In [3]:
def file_to_line_list(filename, encoding='utf-8-sig'):
        line_list = []
        with open(filename, 'r', encoding=encoding) as file:
            for line in file:
                line_list += [line.replace('\n', '')]
        return line_list

def chunk_sub_idx_to_list(sub_line_list):
        """
            Pass in a list where each line is a line in the subtitle file
            Example:
            ['1', '00:00:00,000 --> 00:00:04,430', 'おはようございます', '2', ...]

            return a list where each list item is another list where each item is specific to its index
            Example:
            [['1', '00:00:00,000 --> 00:00:04,430', 'おはようございます'], ['2', ...], ...]
        """
        lines_indexed = []
        tmp = []
        for i, line in enumerate(sub_line_list):
            if line == '':
                continue

            tmp += [line]
            if len(tmp) > 3:
                digit, timestamp = tmp[-2:]
                if digit.strip().isdigit() and '-->' in timestamp:
                    lines_indexed += [tmp[:-2]]
                    tmp = tmp[-2:]
        return lines_indexed
    
def srt_time_to_seconds(time_line):
    def timestr_to_sec(time_str):
        h, m, s_str = time_str.split(':')
        s, ms = s_str.split(',')
        return int(h)*60*60 + int(m)*60 + int(s) + int(ms)/1000 

    start_time_str, stop_time_str = time_line.split(' --> ')
    start_time = timestr_to_sec(start_time_str)
    stop_time = timestr_to_sec(stop_time_str)

    return start_time, stop_time

In [4]:
def add_image(doc, image, idx, w=200, h=200):
    basename = os.path.splitext(os.path.basename(image))[0]
    with doc.tag('img', id=f'image_{idx}', src=image, alt=basename, width=w, height=h, klass="center"):
        pass # No content within this tag

def add_audio_clip(doc, audio_file, idx):
    with doc.tag('audio', controls=True, klass="center"):
        doc.stag('source', src=os.path.basename(audio_file), type="audio/mpeg")
        doc.text('Your browser does not support the audio element.')

def add_sub(doc, sub):
    with doc.tag('div'):
        doc.text(sub)   

In [5]:
def add_furigana(text):

    w_furigana = ''
    for pair in split_furigana(text):
        if len(pair)==2:
            kanji,hira = pair
            w_furigana +=  f"<ruby><rb>{kanji}</rb><rt>{hira}</rt></ruby>"
        else:
            w_furigana += pair[0]
    return w_furigana

In [6]:
def build_html_doc(prj_dir, vid_file, aud_file, sub_file, base_filename, lines_indexed, audio_mode='normal', pad=0.5, line_sep=True):
    """
    audio_mode: normal, off, only
    """ 
    
    prj_name = os.path.basename(prj_dir)
    
    # 1 Video Found    
    if vid_file:
        video_capture = cv2.VideoCapture(vid_file)
    
        if audio_mode == 'normal':
            mp_video = mp.VideoFileClip(vid_file)
    
    # No Video Found
    else:
        if aud_file:
            audio_mode = 'only'
            mp_audio = mp.AudioFileClip(aud_file)
        

    doc, tag, text = Doc().tagtext()
    
    with tag('html'):
        
        doc.asis('<style>')
        doc.text('div {text-align: center;} .center {display: block; margin-left: auto; margin-right: auto;}')
        doc.text('''
            body { background-color: #D8DFEE; }
            h1, h2, h3 { color: #ABA8A9; }
            .highlight { color: #CBF83E; }
            div {text-align: center;} 
            .center {display: block; margin-left: auto; margin-right: auto;}
        ''')
        doc.asis('</style>')
        
        with tag('head'):
            with tag('title'):
                text(prj_name)
                
            # Image Size Control
            with tag('script'):
                doc.asis("""
                function updateImageSize() {
                    var slider = document.getElementById("slider");
                    var images = document.getElementsByTagName("img");
                    for (var i = 0; i < images.length; i++) {
                        images[i].style.width = slider.value + "px";
                        images[i].style.height = "auto";
                    }
                }
                """)
        
        with tag('body'):
            
            with doc.tag('div'):
                with tag('label', ('for', 'slider')):
                    text('Adjust Image Size')
                with doc.tag('input', ('type', 'range'), ('min', '50'), ('max', '500'), ('value', '200'), 
                            ('id', 'slider'), ('oninput', 'updateImageSize()')):
                    pass

            for idx, r in enumerate(lines_indexed):
                line_idx, time_str = r[:2]
                sub_list = r[2:]

                if ADD_FURIGANA:
                    tmp = []
                    for s in sub_list:
                        try:
                            tmp += [add_furigana(s)]
                        except:
                            tmp += [s]

                    sub_list = tmp

                start, stop = srt_time_to_seconds(time_str)
                time_ms = int(1000*((stop - start)/2 + start))
                
                # Process Video-------------------
                if audio_mode != 'only':
                    video_capture.set(cv2.CAP_PROP_POS_MSEC, time_ms)
                    success, image = video_capture.read()

                    if success:
                        new_filename = prj_name + '_' + str(time_ms) + '.jpg'
                        path = os.path.join(prj_dir, new_filename)
                        if not(os.path.exists(path)):
                            cv2.imwrite(path, image)
                            
                        h, w = image.shape[:-1]
                        ratio = h/w
                        
                        add_image(doc, new_filename, idx, w=WIDTH, h=int(WIDTH*ratio))
                
                # Add Subtitle---------------------
                for s in sub_list:
                    add_sub(doc, s)
                            
                # Process Audio--------------------
                if audio_mode != 'off':                    
                    new_filename = prj_name + '_' + str(time_ms) + '.mp3'
                    path = os.path.join(prj_dir, new_filename)
                    
                    # Audio Only Mode
                    if audio_mode == 'only':
                        # Get audio subclip from audio file
                        mp_audio.subclip(max(0, start - pad), stop+pad).write_audiofile(path, verbose=False, logger=None)
                        
                    # Normal Audio Mode
                    else:
                        # Get audio subclip from video file
                        mp_video.subclip(max(0, start - pad), stop+pad).audio.write_audiofile(path, verbose=False, logger=None)
                    
                    add_audio_clip(doc, path)
                    
                if line_sep:
                    doc.stag('hr')  # Add a horizontal line
            
    return doc.indent(doc.getvalue())

In [7]:
device = whisper.torch.device('cuda' if whisper.torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [8]:
# Create an mp3 from video file
def extract_audio(vid_file, track_number=0): 
    def extract_ffmpeg(input_file, output_file, audio_track):
        command = f'ffmpeg -i "{input_file}" -map 0:a:{audio_track} "{output_file}"'
        result = subprocess.run(command, shell=True, text=True, capture_output=True)
        print("stdout:", result.stdout)
        print("stderr:", result.stderr)
        
    base_filename = os.path.splitext(vid_file)[0] # Remove ext

    savename = base_filename + '.mp3'

    # Default to using MoviePy
    if track_number == 0:
        v = mp.VideoFileClip(f)
        v.audio.write_audiofile(savename)
        
    # Extract different track, need ffmpeg
    else:
        try:
            extract_ffmpeg(vid_file, savename, track_number)
        except Exception as e:
            #print('Cant process ffmpeg. Ensure you have ffmpeg installed.')
            print(f'FFMPEG Error: {e}')
            
    return savename

In [9]:
def transcribe_audio(transcription, base_filename):
    savepath = base_filename + '.srt'
    segments = transcription['segments']

    for segment in segments:
        startTime = str(0)+str(timedelta(seconds=int(segment['start'])))+',000'
        endTime = str(0)+str(timedelta(seconds=int(segment['end'])))+',000'
        text = segment['text']
        if len(text) == 0:
            continue
        
        segmentId = segment['id']+1
        segment = f"{segmentId}\n{startTime} --> {endTime}\n{text[1:] if text[0] is ' ' else text}\n\n"

        with open(savepath, 'a', encoding='utf-8') as srtFile:
            srtFile.write(segment)
            
    return savepath

In [10]:
def get_prj_media(base_filename):
    has_ext = lambda exts: [f for f in files if f.startswith(base_filename) and os.path.splitext(f)[-1].lower() in exts]
    
    vid_files = has_ext(VID_EXTS)
    aud_files = has_ext(AUD_EXTS)
    sub_files = has_ext(SUB_EXTS)
    
    assert len(vid_files) <= 1, "Multiple video extensions found w/ same base name."
    assert len(aud_files) <= 1, "Multiple audio extensions found w/ same base name."
    
    # 1 Video Found    
    vid_file = vid_files[0] if len(vid_files) == 1 else None
    aud_file = aud_files[0] if len(aud_files) == 1 else None
    sub_file = sub_files[0] if len(sub_files) == 1 else None
    
    return vid_file, aud_file, sub_file

In [11]:
TRACK_NUMBER = 2

start_time = time.time()

for prj in proj_files:    
    vid_file, aud_file, sub_file = get_prj_media(prj)
    print(f'Files: {vid_file}, {aud_file}, {sub_file}')
    
    # if audio doesnt exist extract it from video
    if aud_file == None:
        if vid_file:
            print(f'Extracting mp3 from {prj}...')
            aud_file = extract_audio(vid_file, track_number=TRACK_NUMBER)
            print(f'Audio File Extracted: {aud_file}')
            print('Finished extracting audio.')
    
    # Use subtitle file doesn't exists create it
    if sub_file == None:
        if os.path.exists(aud_file):
            print(f'Creating transcript using OpenAIs Whisper for {aud_file}.')
            
            model = whisper.load_model("base")
            transcription = model.transcribe(aud_file)
            
            print('Exporting subtitle.')
            sub_file = transcribe_audio(transcription, prj)
            print(f'Sub File Extracted: {sub_file}')
        else:
            print(f'Error. Audio file {aud_file} not found.')
        
    # Check if subtitle exists
    if os.path.exists(sub_file):
        print('Generating Audio Visual HTML Page')
        
        prj_name = os.path.basename(prj).replace(' ','_')
        prj_dir = os.path.join(out_dir, prj_name)
        if not(os.path.exists(prj_dir)):
            os.mkdir(prj_dir)

        line_list = file_to_line_list(sub_file)
        lines_indexed = chunk_sub_idx_to_list(line_list)

        html = build_html_doc(prj_dir, vid_file, aud_file, sub_file, prj, lines_indexed)

        save_path = os.path.join(prj_dir, prj_name + '.html')
        
        with open(save_path, 'w', encoding='utf-8') as html_file:
            html_file.write(html)

        print(f'Finished processing {prj}')

    else:
        print(f'Error. Subtitle file {sub_file} not found.')

minutes = round((time.time() - start_time)/60, 2)
print(f'Total processing time: {minutes}')

Files: None, Input\2023年6月10日（土）「OK! Cozy up!週末増刊号」.mp3, None
Creating transcript using OpenAIs Whisper for Input\2023年6月10日（土）「OK! Cozy up!週末増刊号」.mp3.
Exporting subtitle.
Sub File Extracted: Input\2023年6月10日（土）「OK! Cozy up!週末増刊号」.srt
Generating Audio Visual HTML Page
Finished processing Input\2023年6月10日（土）「OK! Cozy up!週末増刊号」
Total processing time: 435.4954299926758


In [18]:
aud_file

'Input\\2023年6月10日（土）「OK! Cozy up!週末増刊号」.mp3'

In [29]:
# load audio and pad/trim it to fit 30 seconds
audio = whisper.load_audio(aud_file)
sample = whisper.pad_or_trim(audio)

# make log-Mel spectrogram and move to the same device as the model
mel = whisper.log_mel_spectrogram(sample).to(model.device)

# detect the spoken language
_, probs = model.detect_language(mel)
print(f"Detected language: {max(probs, key=probs.get)}")

Detected language: ja


In [31]:
max(probs, key=probs.get)

'ja'

In [21]:
help(whisper.pad_or_trim)

Help on function pad_or_trim in module whisper.audio:

pad_or_trim(array, length: int = 480000, *, axis: int = -1)
    Pad or trim the audio array to N_SAMPLES, as expected by the encoder.



In [23]:
model = whisper.load_model("base", device=device)
audio = whisper.load_audio(aud_file)

In [36]:
test = model.transcribe(sample, language=None)

In [37]:
test

{'text': '6月10日土曜日日本放送OK 工事イヤップ 週末増換号日本放送はナンサーの新用一家ですOK 工事イヤップ 週末増換号今週の放送でセレクトした聞き所今後のニュースの予定を紹介していくプログラムです',
 'segments': [{'id': 0,
   'seek': 0,
   'start': 0.0,
   'end': 6.0,
   'text': '6月10日土曜日',
   'tokens': [50364, 21, 6939, 3279, 6890, 45506, 9531, 250, 6890, 50664],
   'temperature': 0.0,
   'avg_logprob': -0.4274945772611178,
   'compression_ratio': 1.2881355932203389,
   'no_speech_prob': 0.4644383490085602},
  {'id': 1,
   'seek': 0,
   'start': 6.0,
   'end': 12.0,
   'text': '日本放送OK 工事イヤップ 週末増換号',
   'tokens': [50664,
    27311,
    12744,
    29309,
    9443,
    220,
    23323,
    6973,
    8040,
    34969,
    33683,
    220,
    38003,
    1474,
    104,
    24228,
    245,
    36338,
    26987,
    50964],
   'temperature': 0.0,
   'avg_logprob': -0.4274945772611178,
   'compression_ratio': 1.2881355932203389,
   'no_speech_prob': 0.4644383490085602},
  {'id': 2,
   'seek': 0,
   'start': 17.0,
   'end': 20.0,
   'text': '日本放送はナンサーの新用一家です',
   'tokens': [51214,
  

In [32]:
help(model.transcribe)

Help on method transcribe in module whisper.transcribe:

transcribe(audio: Union[str, numpy.ndarray, torch.Tensor], *, verbose: Optional[bool] = None, temperature: Union[float, Tuple[float, ...]] = (0.0, 0.2, 0.4, 0.6, 0.8, 1.0), compression_ratio_threshold: Optional[float] = 2.4, logprob_threshold: Optional[float] = -1.0, no_speech_threshold: Optional[float] = 0.6, condition_on_previous_text: bool = True, initial_prompt: Optional[str] = None, word_timestamps: bool = False, prepend_punctuations: str = '"\'“¿([{-', append_punctuations: str = '"\'.。,，!！?？:：”)]}、', **decode_options) method of whisper.model.Whisper instance
    Transcribe an audio file using Whisper
    
    Parameters
    ----------
    model: Whisper
        The Whisper model instance
    
    audio: Union[str, np.ndarray, torch.Tensor]
        The path to the audio file to open, or the audio waveform
    
    verbose: bool
        Whether to display the text being decoded to the console. If True, displays all the detail