###### Imports

In [None]:
!apt-get install ffmpeg
!pip install ffmpeg-python
!pip install git+https://github.com/openai/whisper.git
print("Done")

In [2]:
import os
import ffmpeg # preprocess
import torch # model download
import whisper # model download
import datetime # overlap removal
import json #for loading from file Json->.srt
# whisper mode dependancies
from tqdm import tqdm

from whisper.utils import get_writer #frin dict to .srt

###### Tools

In [3]:
def proccess_for_VAD(audio_path,VAD_temp_path,vad_threshold=0.4,chunk_threshold=3.0):
    """#recives Audio from audio_path splits by VAD trimming and padding and semgenting & everything...
    #saves the segments VAD_temp_path, 
    # vad_treshold .srt parts split threshold
    # chunk_threshold segment separation threshold"""

    #?note this includes the extention// Just for memory
    audio_name = audio_path.split('/')[-1].split('.')[0]+'.wav' #TODO maybe later split for name simplicity
    print("Encoding audio for VAD...",audio_name)
    if not os.path.exists(VAD_temp_path):
        os.mkdir(VAD_temp_path)
    print(audio_path,f"{VAD_temp_path}/{audio_name}","VAD")
    ffmpeg.input(audio_path).output(
        f"{VAD_temp_path}/{audio_name}",
        ar="16000",
        ac="1",
        acodec="pcm_s16le",
        map_metadata="-1",
        fflags="+bitexact",
    ).overwrite_output().run(quiet=True)
    
    #? downlaod VAD model and the requrired utils
    print("Running VAD...")
    model, utils = torch.hub.load(
        repo_or_dir="snakers4/silero-vad", model="silero_vad", onnx=False
    )

    # required VAD utilities from silero
    (get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
    
    # Generate VAD timestamps
    VAD_SR = 16000
    wav = read_audio(f"{VAD_temp_path}/{audio_name}", sampling_rate=VAD_SR)
    t = get_speech_timestamps(wav, model, sampling_rate=VAD_SR, threshold=vad_threshold)

    #? Add a bit of padding, and remove small gaps
    for i in range(len(t)):
        t[i]["start"] = max(0, t[i]["start"] - 3200)  # 0.2s head
        t[i]["end"] = min(wav.shape[0] - 16, t[i]["end"] + 20800)  # 1.3s tail
        if i > 0 and t[i]["start"] < t[i - 1]["end"]:
            t[i]["start"] = t[i - 1]["end"]  # Remove overlap

    #? Inserts [] where to split audio files to multiple
    # If breaks are longer than chunk_threshold seconds, split into a new audio file
    # This'll effectively turn long transcriptions into many shorter ones
    #* Metadata for chunk files
    u = [[]] 
    for i in range(len(t)):
        if i > 0 and t[i]["start"] > t[i - 1]["end"] + (chunk_threshold * VAD_SR):
            u.append([])
        u[-1].append(t[i])

    #? Merge speech chunks, and delete the original
    for i in range(len(u)):
        save_audio(
            f"{VAD_temp_path}/" + str(i) + ".wav",
            collect_chunks(u[i], wav),
            sampling_rate=VAD_SR,
        )
    os.remove(f"{VAD_temp_path}/{audio_name}")

    #? Convert timestamps to seconds
    for i in range(len(u)):
        time = 0.0
        offset = 0.0
        for j in range(len(u[i])):
            u[i][j]["start"] /= VAD_SR
            u[i][j]["end"] /= VAD_SR
            u[i][j]["chunk_start"] = time
            time += u[i][j]["end"] - u[i][j]["start"]
            u[i][j]["chunk_end"] = time
            if j == 0:
                offset += u[i][j]["start"]
            else:
                offset += u[i][j]["start"] - u[i][j - 1]["end"]
            u[i][j]["offset"] = offset

    #? return the Metadata information of the chunk files
    # Open the file in write mode.
    with open(f"{VAD_temp_path}/{audio_name}.json", 'w') as json_file:
      json.dump(u, json_file)


In [6]:
#The main translation function
def run_Whisper(VAD_temp_path,model_size='large-v2',language='English',task='translate',max_attempts=1):
 
    
    u_path = VAD_temp_path +'/' +[pos_json for pos_json in os.listdir(VAD_temp_path) if pos_json.endswith('.json')][0]
    with open(u_path) as json_file:
      u = json.load(json_file)
    #? Run Whisper on each audio chunk
    print("Running Whisper...")
    model = whisper.load_model(model_size)

    sub_index = 1
    subs = []
    #? the run
    #TODO removing initial_prompt @adds extra complexity we do not need right now
    for i in tqdm(range(len(u))):
        
        #? for loop for retry, incase of Hallucinations
        # so if transcription returns empty or within constraint text break loop
        for x in range(max_attempts):
            result = model.transcribe(
                f"{VAD_temp_path}/" + str(i) + ".wav", task=task, language=language, #TODO removed initial_prompt=initial_prompt
            )
            #? Break if result doesn't end with severe hallucinations
            if len(result["segments"]) == 0:
                break
            elif result["segments"][-1]["end"] < u[i][-1]["chunk_end"] + 10.0:
                break
            elif x+1 < max_attempts:
                print("Retrying chunk", i)
        """takes in the current Segment{result} chunk{u} and srt part{i}"""
        
        suppress_low = [
            "Thank you","Thanks for","ike and ","Bye.","Bye!","Bye bye!","lease sub","The end.","視聴",]
        suppress_high = [
            "ubscribe","my channel","the channel","our channel","ollow me on","for watching",
            "hank you for watching","for your viewing","r viewing","Amara","next video",
            "full video","ranslation by","ranslated by","ee you next week",
            "ご視聴","視聴ありがとうございました",]

        #post proccessing itesm
        for r in result["segments"]:
          # Skip audio timestamped after the chunk has ended
          if r["start"] > u[i][-1]["chunk_end"]:
              continue

          # Reduce log probability for certain words/phrases
          for s in suppress_low:
              if s in r["text"]:
                  r["avg_logprob"] -= 0.15
          for s in suppress_high:
              if s in r["text"]:
                  r["avg_logprob"] -= 0.35

          # Keep segment info for debugging
          del r["tokens"]

          # Skip if log prob is low or no speech prob is high
          if r["avg_logprob"] < -1.0 or r["no_speech_prob"] > 0.7:
              continue

          # Set start timestamp
          start = r["start"] + u[i][0]["offset"]
          for j in range(len(u[i])):
              if (
                  r["start"] >= u[i][j]["chunk_start"]
                  and r["start"] <= u[i][j]["chunk_end"]
              ):
                  start = r["start"] + u[i][j]["offset"]
                  break

          # Prevent overlapping subs
          if len(subs) > 0:
              last_end = subs[-1]['end']
              if last_end > start:
                  subs[-1]['end'] = start

          # Set end timestamp
          end = u[i][-1]["end"] + 0.5
          for j in range(len(u[i])):
              if r["end"] >= u[i][j]["chunk_start"] and r["end"] <= u[i][j]["chunk_end"]:
                  end = r["end"] + u[i][j]["offset"]
                  break
                  
          # Add to SRT list
          subs.append({"id":sub_index,
                  'start':start,
                  'end':end,
                  'text':r["text"].strip()})

          sub_index = 1
    
    return subs
    


###### Infrence

In [11]:
# import os
import datetime
import time
from whisper.utils import get_writer
import shutil

def processor_function(input_file_path,temp_vad_path='temp'):
    proccess_for_VAD(input_file_path,temp_vad_path)
    subs = run_Whisper(temp_vad_path)
    return subs,temp_vad_path

def process_new_files(folder_path):
    # create logs directory if it doesn't exist
    # logs_dir = os.path.join(folder_path, "logs")
    logs_dir ='logs'
    os.makedirs(logs_dir, exist_ok=True)

    # initialize processed files set igoring previously processed onces and non-media items
    processed_files = set()
    media_extensions = [".mp3", ".wav", ".ogg", ".flac", ".m4a", ".wma", ".mp4", 
                        ".mov", ".avi", ".mkv", ".webm", ".flv", ".wmv", ".mpeg"]
    for root, dirs, files in os.walk(folder_path):
        for file_name in files:
            if file_name.startswith("_done_") or os.path.splitext(file_name)[1] not in media_extensions:
                processed_files.add(os.path.join(root, file_name))

    
    # get list of all files in the folder
    for root, dirs, files in os.walk(folder_path):
        for file_name in files:
            file_path = os.path.join(root, file_name)

            # check if file hasn't been processed yet
            if file_path not in processed_files:
                # process the file
                output,temp_vad_path = processor_function(file_path)

                # create log file with date and time stamp, and maintain folder structure
                
                # temp_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S") +'.log'
                log_file_name = file_name + "_" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") 
                log_file_path = os.path.join(logs_dir, os.path.relpath(file_path, folder_path)).replace(file_name, log_file_name+ "/")
                os.makedirs(os.path.dirname(log_file_path), exist_ok=True)

                # write all the results in our structured file format
                writer = get_writer('all', log_file_path)
                writer({"segments":output}, os.path.splitext(file_name)[0])
                shutil.rmtree(temp_vad_path)

                # rename the file with '_done_' prefix
                done_file_path = os.path.join(root, "_done_" + file_name)
                os.rename(file_path, done_file_path)

                # add the file to the processed files set
                processed_files.add(done_file_path)

###### Runner

In [13]:
process_new_files("in_folder")