In [None]:
%cd ..

In [None]:
import difflib
import io
import json
from dataclasses import dataclass, field
from pathlib import Path
from xml.etree import ElementTree

In [None]:
def get_files(dirname: Path, extension: str):
    return {path.stem: path for path in dirname.glob(f"*.{extension}")}

In [None]:
root = Path("/mnt/fast/data")

# Caption Codes

In [None]:
from pytube import YouTube

In [None]:
videos = json.loads(Path("data/videos.json").read_text())

In [None]:
codes = {}
for key, url in videos.items():
    video = YouTube(url)    
    codes[key] = tuple(caption.code for caption in video.captions)

In [None]:
keys = list(videos)

In [None]:
Path("data/codes.json").write_text(json.dumps(codes))

# Reading JSON and XML Captions

In [None]:
def read_xml(fn: Path):
    buf = io.StringIO(fn.read_text())
    tree = ElementTree.parse(buf).getroot()
    segments = []
    for segment in tree.findall("body/p"):
        start, delta = [int(segment.attrib[k]) for k in ("t", "d")]
        text = segment.text.replace("\n", " ").strip()
        segments.append({"start": start, "end": start+delta, "text": text})
    return segments

In [None]:
def read_json(fn: Path):
    return [
        {
            k: segment[k] if k == "text" else int(segment[k])*1000 
            for k in ("start", "end", "text")
        } 
        for segment in json.loads(fn.read_text())
    ]

In [None]:
def to_text(segments: list) -> str:
    return " ".join([x["text"].strip() for x in segments])

In [None]:
@dataclass
class Transcript:
    title: str
    reference: list
    whisper: list
    
    def get_a(self, index): return self.reference[index]["text"]

    def get_b(self, index): return self.whisper[index]["text"]
    
    def to_text(self):
        return to_text(self.reference), to_text(self.whisper)
    
    def __repr__(self):
        return f"Transcript(title='{self.title}', n={len(self.reference)}, m={len(self.whisper)})"

@dataclass
class TranscriptStorage:
    root: Path
    subdirs: tuple[str, str]
    keys: list = field(init=False)
    parsed: list = field(init=False)
    manual: list = field(init=False)
    
    def __post_init__(self):
        parsed, manual = self.subdirs
        self.parsed = get_files(self.root/parsed, "json")
        self.manual = get_files(self.root/manual, "xml")
        self.keys = sorted(self.manual)
        
    def __len__(self): return len(self.keys)
    
    def get(self, item) -> Transcript:
        key = item if isinstance(item, str) else self.keys[item]
        ref = read_xml(self.manual[key])
        cleaned_key = "".join([c for c in key if c not in ":;,#"])
        new = read_json(self.parsed[cleaned_key])
        return Transcript(key, ref, new)

In [None]:
from fastprogress import progress_bar

model_names = "base_en", "small_en"

for name in progress_bar(model_names):
    storage = TranscriptStorage(root, (f"lectures_parsed_{name}", "lectures"))
    n_total = len(storage)
    texts_dir = Path(f"/mnt/fast/data/texts/{name}")
    texts_dir.mkdir(exist_ok=True, parents=True)

    for i in progress_bar(range(n_total)):
        tr = storage.get(i)
        ref, new = tr.to_text()
        content = json.dumps({"title": tr.title, "ref": ref, "new": new})
        texts_dir.joinpath(f"{tr.title}.json").write_text(content)

In [None]:
!ls -1 /mnt/fast/data/texts/base_en | wc -l

In [None]:
!ls -1 /mnt/fast/data/texts/small_en | wc -l