In [51]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Generate dataset

In [52]:
!pip install decoder



In [53]:
#from csrc.build import generate_from_audio
#from csrc.configurations import DatasetConfig as DC
#from utils import check_type, extract_audio, count_class
import decoder

# CSRD CONFIG

In [54]:
"""
Those are the configuration settings.
"""

class ModelConfig(object):
    # Configurations for SED model.
    sed_model_config = {
        "sample_rate": 32000,
        "window_size": 1024,
        "hop_size": 320,
        "mel_bins": 64,
        "fmin": 50,
        "fmax": 14000,
        "classes_num": 527,
    }
    
class DatasetConfig(object):
    """
    Configuration for building your own dataset from sources.
    
    Attributes:
        dataset_clip_time(int): Clip length for dataset. Default 2s.
        dataset_sample_rate(int): Clip length for dataset. Default 32000.
        dataset_audio_format(str): Clip format for dataset. Default using "wav"
        sub_encoding(str): Use "utf-8" encoding for Aegisub subtitle support.
    """
    
    dataset_clip_time = 2 # seconds
    
    dataset_sample_rate = 32000
    
    dataset_audio_format = "wav" 
    
    sub_encoding = "utf-8" # recommanded
    

#UTILS.py Main

In [55]:
"""
Preprocess some input:

- If it"s video, extract audio from it.
- If it"s audio, do nothing.

To run the audio extraction process normally, you should name the video right
in the same standard as the audio and make sure it supports the format the 
Python Module Moviepy supports.

Check "https://pypi.org/project/moviepy/" for formats Moviepy supports.
"""

from pathlib import Path
import os
import time
import mimetypes
import os

import librosa
import moviepy.editor as mp

def mono_load(path, sr=32000, mono=True):
    """
    A custonmized librosa loading process emphasizing mono channel.
    
    Args:
        path: Audio file path.
        sr: Sample rate for librosa loading.
        mono: Indicate mono channel for the output.
        
    Returns: 
        y: Librosa loading output.
        c: The number of channels for the original audio file. 
    """
    
    start = time.time()
    
    print(f"Loading file: {path}")
    y, c = librosa.load(path, sr=sr, mono=mono)
    
    end = time.time()
    
    print(f"Loading completed. Cost {(end-start):.2f}s\n")
    
    return y, c 
    
def vb(pre, any, v):
    """
    Verbose print.
    """
    if v:
        print(f"{pre} {repr(any)}")
        
def check_type(file_path):
    """
    Check whether it"s a audio or video straitforwardly.
    
    Args:
        file_path: The path of the file to check type.
        
    Returns:
        is_video: Whether the file is a video or an audio.
    """
    
    assert (Path(file_path).exists()) and (Path(file_path).is_file()), "Your input file path is not valid or the file doesn't exist."
    
    is_video = False
    
    mimetypes.init()
    
    mimestart = mimetypes.guess_type(str(file_path))[0]
    
    if mimestart: # If the metadata can't be pared, it's mostly because the file is an audio file.
        try:
            mimestart = mimestart.split("/")[0]
        except RuntimeError as e:
            print(e)
            print("Unrecognizable file type. Is the file format valid? (Using mimetypes)\n")    
        
        assert mimestart=="video" or mimestart=="audio", "Input file format unrecognizable as video or audio (using mimetypes).\n"
        
        if mimestart == "video": is_video = True 

    return is_video

def extract_audio(file_path, format: str="wav"):
    """
    Extract audio from video.
    
    Args:
        file_path: File path for the video clip.    
        
    Returns:
        mv_audio_file: Audio file path extracted.
    """
    
    print(f"Extracting audio from {file_path}")
    
    mv = mp.VideoFileClip(file_path)
    assert mv!=None, "Unable to extract any information from the video clip."
    
    mv_name = str(file_path).split("/")[-1].split(".")[0]
    mv_audio_file = Path(file_path).parent / f"{mv_name}.{format}"
    
    # A potential error for moviepy to resolve system Path could trigger AttributeError.
    # We catch the error and then use string for movie py to resolve the file path.
    # This error occurs on Windows.
    try:
        mv.audio.write_audiofile(mv_audio_file)
    except AttributeError:
        print("\nNote: Moviepy failed to resolve your video path. Currently Use Path as string for moviepy to work.\n")
        mv.audio.write_audiofile(str(mv_audio_file))
     
    print(f"Extraction Successful! Writing {Path(mv_audio_file).stat().st_size} in {mv_audio_file}.")
    
    return mv_audio_file

def count_class(path):
    """
    Count how many instances of 0 and 1 under the folder path.
    
    Args:
        file_path: The path of the folder containing the class instances.
        
    Returns:
        zeros: The number of instances in class 0 (No human speaking).
        ones: The numger of instances in class 0 (Human speaking).
        total: total instances.
    """
    
    zeros = 0
    ones = 0
    total = 0
    
    for file in os.listdir(path):
        c = str(file).split(".")[0][-1]
        ones += int(c)
        total += 1
    zeros = total - ones
    
    print(f"\nLabel 1 instances: {ones}")
    print(f"Label 0 instances: {zeros}")
    print(f"Total clips: {total}\n")
    
    return zeros, ones, total

def get_duration(audio_file_path, y=None, sr=None):
    """
    Check the consistency between audio header metadata and audio waveform.
    
    Args:
        audio_file_path: The audio file path.
        y: Audio time series.
        sr: Audio sample rate.
        
    Returns (one of):
        waveform_duration: Audio duration according to the audio waveform.
        header_duration: Audio duration accrording to audio metadata.
    """
    
    header_duration = librosa.get_duration(filename=audio_file_path)
    
    if sr:
        wavform_duration = librosa.get_duration(y=y, sr=sr)
        
        if header_duration == wavform_duration:
            print("Audio file consistency ensured.")
        else:
            print("There is inconsistency between the audio waveform and header metadata." \
                "This could be ignored.")
            
        return wavform_duration
    
    return header_duration
    


# CSRC UITLS

In [56]:
"""
Util functions for deep learning.
"""

from pathlib import Path
import random

import numpy as np
import torch
import os

#from config import ROOT_PATH_ABS

def seed_all(s:int=42) -> None: 
    random.seed(s)
    np.random.seed(s)
    os.environ["PYTHONHASHSEED"] = str(s)
    torch.manual_seed(s)
    torch.cuda.manual_seed(s)  # type: ignore
    torch.backends.cudnn.deterministic = True  # type: ignore
    torch.backends.cudnn.benchmark = True  # type: ignore

def seed_dataset(s:int=42) -> None:
    random.seed(s)
    

class TrainingDirs(object):
    """
    Initiate the working directory systems for training.
    """
    
    def __init__(self, dsname: str, pre_test: bool) -> None:
        super().__init__()
        ROOT = Path(ROOT_PATH_ABS)
        INPUT_ROOT = ROOT / 'data'
        TARGET_AUDIO_DIR = INPUT_ROOT / dsname
        print(f"Working with dataset under {TARGET_AUDIO_DIR}.")
        assert os.path.exists(TARGET_AUDIO_DIR), "Input dataset folder does not exist."
        
        self.dataset_folder = TARGET_AUDIO_DIR
        self.train_folder = TARGET_AUDIO_DIR / 'train' if pre_test else TARGET_AUDIO_DIR
        self.test_folder = TARGET_AUDIO_DIR / 'test' if pre_test else None
    

# CSRC BUILD

In [57]:
"""Generate audio clips we need for training.

1. Generate 2secs clips for training, wav file format with unique id.
2. Generate training csv file corresponding to the whole dataset
"""

from pathlib import Path

import soundfile as sf
import numpy as np

#from csrc.configurations import DatasetConfig
#from utils import mono_load, vb

def _tagging(clip_ending_second, onsets, offsets):
    """
    Tag the audio by judging whether the clip contains a dialogue.
    If the clip contains dialogues in it, we tag it as 1. Otherwise we tag it as 0.
    """
    assert int(clip_ending_second)%DatasetConfig.dataset_clip_time == 0., 'Sorry, there is a length mismatch when trying to tagging the clip.'
    
    tag = 0
    clip_beginning_second = clip_ending_second - 5
    for _, dialogue in enumerate(zip(onsets, offsets)):
        
        if dialogue[0] > clip_ending_second: break
        
        if clip_beginning_second < dialogue[0] and clip_ending_second > dialogue[0]:
            tag = 1
            break
        
        if clip_beginning_second > dialogue[1] and clip_ending_second < dialogue[1]:
            tag = 1 
            break
        
    return tag

def _resample(y, name, clip_format, index, path, verbose=False):
    output_path = f'{path}/{index}-{name}.{clip_format}'
    if verbose:
        vb("Making file:", output_path, verbose)
    sf.write(output_path, y, DatasetConfig.dataset_sample_rate, format=clip_format, subtype='PCM_24')

def generate_from_audio(audio_path, sub_path, dest_path, sub_decoder, verbose=False):
    """Generate a new dataset from video.
    
    Args:
        audio_path: The file path of the audio.
        sub_path: The file path of the subtitle.
        dest_path: The destination path of the well-formatted dataset.
        sub_decoder: The decoder to get sub file format and events.
    """
        
    # Get name suffix.
    video_name = audio_path.stem if isinstance(audio_path, Path) else Path(audio_path).stem
    
    # Get formatted sub events for further engineering.
    print("Extracting timestamps from the subtitle file...")
    decoder = sub_decoder(sub_path, encoding=DatasetConfig.sub_encoding)
    onsets, offsets = decoder.time_series
    if verbose:
        vb('Onset timestamps generated:', onsets, verbose)
        vb('Offset timestamps generated:', offsets, verbose)
        
    print("Extraction complete!\n")

    # Load audio using librosa and resample the audio in this step.
    print("Librosa loading audio...")
    y, sr = mono_load(audio_path)
    print(f"Loading source file success! Using sampling rate {DatasetConfig.dataset_sample_rate}.\n")
    
    # Get the clip sample lengths.
    clip_sample_length = sr * DatasetConfig.dataset_clip_time
    
    # Main loop.
    # Divide a whole audio into clips.
    clip_flag = 0 # current working clip in current sound position (not senconds)
    idx = 0 # current working clip number 
    exceeded = False # whether the clip has exceeded the audio file
    sound = y
    tag_1 = 0
    tag_0 = 0

    print("Start building process.\nTransforming dataset...\n")
    while True:
        if clip_flag + clip_sample_length > len(sound):
            padding = clip_flag + clip_sample_length - len(sound)
            clip = np.zeros(clip_sample_length)
            clip[:-padding] = sound[clip_flag:]
            exceeded = True
        else:
            clip = sound[clip_flag: clip_flag+clip_sample_length]
            
        clip_flag += clip_sample_length
        idx +=1
         
        # Get the clip name with corresponding labels with timeline of this clip and sub_events.
        clip_tag = _tagging(clip_flag/sr, onsets, offsets)
        if clip_tag == 0:
            tag_0 += 1
        if clip_tag == 1:
            tag_1 += 1
            
        clip_name = video_name + '-' + str(clip_tag)
        _resample(clip, clip_name, DatasetConfig.dataset_audio_format, idx, dest_path, verbose)
        
        # If exceeded, break the loop.
        if exceeded: break
    
    print(f"Building process finished for {video_name}.")
    print(f"Label 1 (speech) clips: {tag_1}\nLabel 0 (non-speech) clips: {tag_0} \n")
    


## user configurations

In [64]:
# Kowning: the name of the audio is restrained to (name-lang).xxx, you can define the name as you like but remember to add the  standard language type as lang.
# You can name your subtitle file as your want."
#test_clips = ("./src/src-test/test-eng.mp4", "./src/src-test/test.ass")

dbc = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/1.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/1.srt")
ab = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/2.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/2.srt")
lstsb = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/3.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/3.srt")
mi = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/4.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/4.srt")
tdkr = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/5.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/5.srt")
tkoh = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/6.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/6.srt")
tks = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/data movie/7.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/movie srt/7.srt")
lms = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v1/1.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v1/1.srt")
asf = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v2/2.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v2/2.srt")
frt = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v3/3.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v3/3.srt")
ewe = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v4/4.mp4", "/content/drive/MyDrive/Auto-Subtitle-File-Generation/src/clips/v4/4.srt")

dataset_source_long = [dbc, ab, lstsb, mi, tdkr, tkoh, tks, lms, asf, frt, ewe]

#dataset_source_short = [dbc, ab, lstsb]

#dataset_source_medium = [dbc, ab, lstsb, mi, tdkr]

sources = [
    # (audio file path under src, subtitle file path correspondingly under src)\n",
    *dataset_source_long
]

# Destination folder for you to store current sources clips.
# It"s ok if you want to store clips directly under the data folder.
# and this won"t effect whether you want to separate the train/test folder by yourself.

dest_path = ("/content/drive/MyDrive/Auto-Subtitle-File-Generation/data/standard-p2-32khz")

In [65]:
_, _, _ = count_class(dest_path)


Label 1 instances: 189
Label 0 instances: 234
Total clips: 423



#Decoder

In [60]:
"""Decode subtitle files for materials bulding the dataset.
"""

from pathlib import Path
#from config import DecoderConfig as DC


class Decoder(object):
    """Decoder containing public arguments and methods for subtitle files.
    """
    
    def __init__(self, file_path, encoding, trim) -> None:
        super().__init__()
        self.file_path = file_path
        self.encoding = encoding
        self.trim = trim
        
    def _trim_events(self, onsets, offsets):
        """Trim the events.

        Sometimes the subtitle will continue after cutting out a clip (2s) but that time 
        is very short (like 0.1s) that nobody (let alone machines) can recognize human 
        speech in it. To avoid such bias causing a lot of mislabeled, unrecognizable clips
        in the dataset. we should trim the event onsets and offsets to keep the obvious, 
        recognizable speech clips only.
        
        Args:
            onsets: [events onsets].
            offsets: [events offsets].
            
        Returns: 
            onsets: [trimmed events onsets].
            offsets: [trimmed events offsets].
        """
        
        def handle_offset(offset):
            if (offset % 2.) < DatasetConfig.trimming_end:
                offset = float(int(offset))
            
            return offset
                
        
            
        # Trim offsets.
        offsets = list(map(handle_offset, offsets))
        
        # Trim onsets.
        onsets = list(map(handle_onset, onsets))
        
        return onsets, offsets


class ASSDecoder(Decoder):
    """
    Decode .ass(.ssa) subtitle files
    
    Args: 
        file_path: The file path of the subtitle file.
        encoding: The encoding of the subtitle file.
        
    Attributes:
        file_type: Subtitle file format.
        
    Properties:
        time_series: Containing all events timestamps (s).
    """
    
    file_type = "ass"

    def __init__(self, file_path, encoding="utf-8", trim=True):
        assert isinstance(file_path, str) or isinstance(file_path, Path), "Invalid file path, only 'str' and Pathlib.Path' supported."
        super().__init__(file_path, encoding, trim)
        self.flag = 0
        self.tags =  self._tags()
        assert len(self.tags["events"])==1, "There should only be one [Events] tag in sub file."
        assert len(self.tags.keys()) == 3, "Your sub file should only and must contain following components: headers lines, [...Styles], [Events]."

    def _tags(self):

        # iterate the whole file and return tags of headers/styles/events
        with open(self.file_path, encoding=self.encoding) as f:
            tags = dict()
            tags["headers"] = 0
            tags["events"] = list()
            tags["styles"] = list()
            for i, line in enumerate(f.readlines()):
                if "events" in line.lower() and line.startswith("["):
                    tags["events"].append(i)
                if "styles" in line.lower() and line.startswith("["):
                    tags["styles"].append(i)

        return tags
    
    def _decode_time(self, str_time):
        """Decode time from src to float(.2f), which stands for seconds.
        
        Returns:
            float_time: Seconds of the corresponding time.
            
        Args:
            str_time: String format for time object.
            
        Properties:
            events: Parsed events output.
            time_series: series of begining and ending timestamps.
        """
        
        tail = float(str_time.split(".")[-1]) * 1e-2
        h, m, s = str_time.split(".")[0].split(":")
        float_time = int(h)*3600 + int(m)*60 + int(s) + tail
        
        return float_time

    @property
    def events(self):
        with open(self.file_path, encoding=self.encoding) as f:
            assert len(self.tags["events"])==1, "There should only be one [Events] tag in sub file."

            events = []
            events_info = {
                "tag": "",
                "header": "",
                "features": ""
            }

            # get well formatted sub events list
            events = f.readlines()[self.tags["events"][0]:]
            events = [event.lstrip().rstrip() for event in events if event != "\n"]

            # collect events information
            events_tag = events[0]
            events_header = events[1]
            features = events_header.split(":")[1].split(",")
            assert len(features) == 10, "Events feature number does not fit the standard, please check your sub file."

            events_info["tag"] = events_tag
            events_info["header"] = events_header
            events_info["features"] = features

        return events[2:], events_info
    
    @property
    def time_series(self):
        """Return two timestamp lists, in a list each element stands for the beginning or the end of a dialogue.

        Returns:
            on_ts: List of all event onsets.
            off_ts: List of all event offsets.
        """
        
        events, _ = self.events
        on_ts = list()
        off_ts = list()
        
        assert events is not None and events is not [], "Events empty, can not generate time series"
            
        for event in events:
            # remove duplicated timeseries, for multilanguage sub file we directly ingore them
            # we assume all languages" sub file are in the same time series
            
            onset = event.split(",")[1]
            offset = event.split(",")[2]
            
            if onset in on_ts or offset in off_ts:
                break

            on_ts.append(self._decode_time(onset))
            off_ts.append(self._decode_time(offset))
            
        on_ts, off_ts = self._trim_events(on_ts, off_ts) if self.trim else (on_ts, off_ts)
            
        assert len(on_ts) == len(off_ts), "Unable to match onset with offset for Dialogues, please check your sub file"
        
        return on_ts, off_ts
             
            
class SRTDecoder(Decoder):
    """Decode .srt format subtitle files.
    
    Args:
        file_path(str/path): The subtitle file path for decoding.
        encoding(str): The encoding of the subtitle file.
    
    Attributes:
        file_type: The subtitle file type.
        
    Properties:
        time_series -> on_ts, off_ts: Timestamp collections of the beginning and ending of each event. 
    """
    
    file_type = "srt"
    
    def __init__(self, file_path, encoding="utf-8", trim=True) -> None:
        assert isinstance(file_path, str) or isinstance(file_path, Path), "Invalid file path, only 'str' and Pathlib.Path' supported."
        super().__init__(file_path, encoding, trim)

    def _decode_time(self, str_time):
        """Decode time from src to float(.2f), which stands for seconds.
        
        Returns:
            float_time: Seconds of the corresponding time.
            
        Args:
            str_time: String format for time object.
            
        Properties:
            events: Parsed events output.
            time_series: series of begining and ending timestamps.
        """
                
        tail = float(str_time.split(",")[-1]) * 1e-3
        h, m, s = str_time.split(",")[0].split(":")
        float_time = int(h)*3600 + int(m)*60 + int(s) + tail
        
        return float_time
        
    @property
    def time_series(self):
        """Return event timestamps.
        """
        on_ts = []
        off_ts = []
        
        with open(self.file_path, mode="r", encoding=self.encoding) as f:
            for line in f.readlines():
                if "-->" in line:
                    onset = line.split("-")[0].lstrip().rstrip()
                    offset = line.split(">")[-1].lstrip().rstrip()
                    onset = self._decode_time(onset)
                    offset = self._decode_time(offset)
                    if onset:
                        on_ts.append(onset)
                    if offset: 
                        off_ts.append(offset)
                        
        on_ts, off_ts = self._trim_events(on_ts, off_ts) if self.trim else (on_ts, off_ts)
        
        assert len(on_ts)==len(off_ts), "Mismatch for timestamp series."

        return on_ts, off_ts


## Start auto generation

In [67]:
for source_file_path, subtitle_file_path in sources: 
    if subtitle_file_path.split(".")[-1]=="ass":
        subtitle_decoder = decoder.ASSDecoder
    if subtitle_file_path.split(".")[-1]=="srt":
      subtitle_decoder = decoder.SRTDecoder
    is_video = check_type(source_file_path)
    audio_file_path = extract_audio(source_file_path, format=DatasetConfig.dataset_audio_format) if is_video else source_file_path
    generate_from_audio(audio_path=audio_file_path, sub_path=subtitle_file_path, dest_path=dest_path, sub_decoder=subtitle_decoder,  verbose=True)

AttributeError: ignored

In [62]:
_, _, _ = count_class(dest_path)


Label 1 instances: 189
Label 0 instances: 234
Total clips: 423

