### Notebook for processing the transcripts of DataTalksClub podcasts available via GitHub

GitHub repo link: https://github.com/DataTalksClub/datatalksclub.github.io/tree/main/_podcast

Downloading the transcripts from GitHub is easier than downloading them from YouTube. However, not all podcasts are available on GitHub. 

In [1]:
from __future__ import annotations

import io
import json
import re
import time
import zipfile
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple
from urllib.parse import parse_qs, urlparse

import requests
import frontmatter
from typing import Tuple

In [2]:
# ------------------------------------------------------------
# GitHub zip reader (your code, lightly cleaned)
# ------------------------------------------------------------

@dataclass
class RawRepositoryFile:
    filename: str
    content: str


class GithubRepositoryDataReader:
    """
    Downloads and parses files from a GitHub repository (main branch) via ZIP.
    """

    def __init__(
        self,
        repo_owner: str,
        repo_name: str,
        allowed_extensions: Iterable[str] | None = None,
        filename_filter: Callable[[str], bool] | None = None,
    ):
        prefix = "https://codeload.github.com"
        self.url = f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"

        self.allowed_extensions = {ext.lower() for ext in allowed_extensions} if allowed_extensions else set()
        self.filename_filter = filename_filter or (lambda _: True)

    def read(self) -> list[RawRepositoryFile]:
        resp = requests.get(self.url, timeout=60)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        try:
            return self._extract_files(zf)
        finally:
            zf.close()

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        data: List[RawRepositoryFile] = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore").strip()

                data.append(RawRepositoryFile(filename=filepath, content=content))

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        fp = filepath.lower()

        if fp.endswith("/"):
            return True

        filename = fp.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(fp)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(fp):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        filename = filepath.split("/")[-1]
        return filename.rsplit(".", 1)[-1] if "." in filename else ""

    def _normalize_filepath(self, filepath: str) -> str:
        parts = filepath.split("/", maxsplit=1)
        return parts[1] if len(parts) > 1 else parts[0]

In [3]:
# ------------------------------------------------------------
# Transcript + title extraction (your approach)
# ------------------------------------------------------------

def _has_transcript(content: str) -> bool:
    try:
        post = frontmatter.loads(content or "")
    except Exception:
        return False

    tr = post.get("transcript")
    if not isinstance(tr, list) or not tr:
        return False

    return any(isinstance(e, dict) and ("line" in e or "header" in e) for e in tr)


def extract_paragraphs_from_raw(raw_file: RawRepositoryFile) -> List[str]:
    post = frontmatter.loads(raw_file.content)
    paras: List[str] = []

    for e in post.get("transcript", []):
        if isinstance(e, dict) and "line" in e:
            paras.append(str(e["line"]).replace("\n", " ").strip())
        elif isinstance(e, dict) and "header" in e:
            paras.append(f"## {str(e['header']).strip()}")

    return [p for p in paras if p]


def extract_title_from_raw(raw: RawRepositoryFile) -> Optional[str]:
    try:
        post = frontmatter.loads(raw.content or "")
    except Exception:
        return None

    t = post.get("title")
    if isinstance(t, (str, int, float)):
        s = str(t).strip()
        return s or None
    return None

In [4]:
# ------------------------------------------------------------
# YouTube URL -> video_id
# ------------------------------------------------------------

_YT_ID_RE = re.compile(r"^[A-Za-z0-9_-]{8,20}$")

def youtube_video_id(url_or_id: str) -> Optional[str]:
    s = (url_or_id or "").strip()
    if not s:
        return None

    # already an ID
    if _YT_ID_RE.fullmatch(s):
        return s

    try:
        u = urlparse(s)
    except Exception:
        return None

    host = (u.netloc or "").lower()
    path = (u.path or "").strip("/")

    if "youtu.be" in host:
        # youtu.be/<id>
        parts = path.split("/")
        return parts[0] if parts and parts[0] else None

    if "youtube.com" in host:
        if path == "watch":
            qs = parse_qs(u.query)
            return qs.get("v", [None])[0]
        parts = path.split("/")
        if parts and parts[0] in {"live", "embed", "shorts"} and len(parts) >= 2:
            return parts[1]

    return None

In [5]:
def normalize_title(s: str) -> str:
    s = (s or "").lower().strip()
    s = re.sub(r"[\u2010-\u2015]", "-", s)          # normalize weird dashes
    s = re.sub(r"[^a-z0-9\s-]", " ", s)             # drop punctuation
    s = re.sub(r"\s+", " ", s).strip()              # collapse spaces
    return s

def slugify(s: str, max_len: int = 80) -> str:
    s = normalize_title(s)
    s = s.replace(" ", "-")
    s = re.sub(r"-+", "-", s).strip("-")
    return s[:max_len] or "untitled"

def token_overlap_score(a: str, b: str) -> float:
    ta = set(normalize_title(a).split())
    tb = set(normalize_title(b).split())
    if not ta or not tb:
        return 0.0
    return len(ta & tb) / max(len(ta), len(tb))

In [6]:
# ------------------------------------------------------------
# Map podcast files to youtube IDs by scanning front-matter strings
# ------------------------------------------------------------

def _collect_strings(obj: Any) -> List[str]:
    """
    Recursively collect all string values from a nested structure.
    Useful to search front-matter for YouTube links/ids.
    """
    out: List[str] = []
    if isinstance(obj, str):
        out.append(obj)
    elif isinstance(obj, dict):
        for v in obj.values():
            out.extend(_collect_strings(v))
    elif isinstance(obj, list):
        for v in obj:
            out.extend(_collect_strings(v))
    return out


def extract_video_id_from_frontmatter(raw: RawRepositoryFile) -> Optional[str]:
    """
    Parse YAML front-matter and try to find a YouTube video_id in any string field.
    This makes the code robust to different keys (youtube, video, links, etc.).
    """
    try:
        post = frontmatter.loads(raw.content or "")
    except Exception:
        return None

    strings = _collect_strings(post.metadata)

    # Try direct IDs first
    for s in strings:
        vid = youtube_video_id(s)
        if vid:
            return vid

    # Also scan for embedded youtube URLs inside larger strings
    for s in strings:
        # find any youtube-looking substring
        m = re.search(r"(https?://(?:www\.)?(?:youtube\.com|youtu\.be)[^\s)\"']+)", s)
        if m:
            vid = youtube_video_id(m.group(1))
            if vid:
                return vid

    return None

def build_title_to_raw_map(files: List[RawRepositoryFile]) -> Dict[str, RawRepositoryFile]:
    mapping: Dict[str, RawRepositoryFile] = {}
    for raw in files:
        t = extract_title_from_raw(raw)
        if not t:
            continue
        mapping[normalize_title(t)] = raw
    return mapping

In [7]:
# ------------------------------------------------------------
# Saving: JSON + TXT + manifest
# ------------------------------------------------------------

def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def append_manifest(path: Path, record: Dict[str, Any]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")


def load_processed_video_ids(manifest_path: Path) -> Set[str]:
    if not manifest_path.exists():
        return set()

    processed: Set[str] = set()
    for line in manifest_path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            obj = json.loads(line)
            vid = obj.get("video_id")
            if isinstance(vid, str) and vid:
                processed.add(vid)
        except Exception:
            continue
    return processed


def transcript_to_text(paras: List[str]) -> str:
    # Keep headers as-is; otherwise join by newlines.
    return "\n".join([p.strip() for p in paras if p.strip()]).strip() + "\n"


def save_transcript_files(
    out_dir: Path,
    channel: str,
    video_id: str,
    video_url: str,
    raw_filename: str,
    title: Optional[str],
    paras: List[str],
) -> None:
    json_dir = out_dir / "json" / channel
    txt_dir = out_dir / "txt" / channel
    json_dir.mkdir(parents=True, exist_ok=True)
    txt_dir.mkdir(parents=True, exist_ok=True)

    now = utc_now_iso()

    # You asked for a format "very similar" to your YouTube JSON:
    # We'll keep transcript as a list of paragraph strings (because GitHub source is paragraphs),
    # and also include a "segments" version (simple conversion) in case you want it later.
    payload = {
        "source": "github",
        "repo": "DataTalksClub/datatalksclub.github.io",
        "channel": channel,
        "video_id": video_id,
        "video_url": video_url,
        "video_title": title,
        "github_filename": raw_filename,
        "fetched_at": now,
        "transcript_paragraphs": paras,  # list[str]
    }

    (json_dir / f"{video_id}.json").write_text(
        json.dumps(payload, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

    (txt_dir / f"{video_id}.txt").write_text(
        transcript_to_text(paras),
        encoding="utf-8",
    )

In [8]:
def find_best_title_match(
    query_title: str,
    title_map: Dict[str, RawRepositoryFile],
    *,
    min_score: float = 0.5,
) -> Tuple[Optional[str], Optional[RawRepositoryFile]]:
    qn = normalize_title(query_title)
    if not qn:
        return None, None

    # 1) exact normalized match
    if qn in title_map:
        return qn, title_map[qn]

    # 2) substring match (either direction)
    for tn, raw in title_map.items():
        if qn in tn or tn in qn:
            return tn, raw

    # 3) token overlap scoring
    best_tn = None
    best_raw = None
    best_score = 0.0

    for tn, raw in title_map.items():
        score = token_overlap_score(qn, tn)
        if score > best_score:
            best_score = score
            best_tn = tn
            best_raw = raw

    if best_score >= min_score:
        return best_tn, best_raw

    return None, None


In [9]:
# ------------------------------------------------------------
# Main: given list of YouTube URLs -> process matching podcasts
# ------------------------------------------------------------

def download_github_podcast_transcripts_for_titles(
    titles: List[str],
    *,
    out_dir: Path = Path("data/github_podcast"),
    channel: str = "datatalksclub",
    sleep_s: float = 0.1,
    only_with_transcripts: bool = True,
) -> None:
    manifest_path = out_dir / "manifest.jsonl"
    processed = load_processed_video_ids(manifest_path)  # consider updating this to track slugs instead

    reader = GithubRepositoryDataReader(
        repo_owner="DataTalksClub",
        repo_name="datatalksclub.github.io",
        allowed_extensions={"md", "mdx"},
        filename_filter=lambda p: p.startswith("_podcast/"),
    )
    files = reader.read()
    if only_with_transcripts:
        files = [f for f in files if _has_transcript(f.content)]

    title_map = build_title_to_raw_map(files)

    for title in titles:
        now = utc_now_iso()
        slug = slugify(title)

        # If you keep your processed logic keyed by video_id, change it to slug:
        # (recommended) skip if slug already OK in manifest
        # For now, just don't skip:
        matched_key, raw = find_best_title_match(title, title_map)

        if raw is None:
            append_manifest(
                manifest_path,
                {
                    "title": title,
                    "slug": slug,
                    "channel": channel,
                    "status": "not_found",
                    "error": "No matching episode title found in _podcast front-matter",
                    "fetched_at": now,
                },
            )
            continue

        try:
            real_title = extract_title_from_raw(raw)
            paras = extract_paragraphs_from_raw(raw)

            # Save using slug (no video_id available)
            save_transcript_files(
                out_dir=out_dir,
                channel=channel,
                video_id=slug,           # reuse field for filename
                video_url="",            # unknown / not provided
                raw_filename=raw.filename,
                title=real_title,
                paras=paras,
            )

            append_manifest(
                manifest_path,
                {
                    "title": title,
                    "matched_title": real_title,
                    "slug": slug,
                    "channel": channel,
                    "status": "ok",
                    "github_filename": raw.filename,
                    "fetched_at": now,
                },
            )
        except Exception as e:
            append_manifest(
                manifest_path,
                {
                    "title": title,
                    "slug": slug,
                    "channel": channel,
                    "status": "error",
                    "error": repr(e),
                    "github_filename": raw.filename if raw else None,
                    "fetched_at": now,
                },
            )
        finally:
            time.sleep(sleep_s)

In [10]:
#episode_titles = ["Cracking the Code: Machine Learning Made Understandable - Christoph Molnar", 
#                  "Knowledge Graphs and LLMs Across Academia and Industry - Anahita Pakiman", 
#                  "Working as a Core Developer in the Scikit-Learn Universe - Guillaume Lemaître", 
#                  "Trends in AI Infrastructure - Andrey Cheptsov", 
#                  "Trends in Data Engineering - Adrian Brudaru", 
#                  "From Human-in-the-Loop to Agent-in-the-Loop: A Practical Transition Guide - Ertugrul Mutlu"
#                 ]
episode_titles = ["Data Science for Public Policy — Ethical AI, Climate Justice & Impact Projects (Christine Cepelak)", 
                  "Responsible & Explainable AI: Practical Guide to Bias Detection, Fairness & Governance (Supreet Kaur)", 
                  "MLOps in Finance: Regulated Deployment, CI/CD and Model Governance (Nemanja Radojkovic)", 
                  "Master Industrial Data: Synthetic Tabular Data, Small-Data Modeling, Sensors & MLOps (Rosona Eldred)", 
                  "Building and Scaling Data Science Practice in Industrial Enterprises: AI Adoption, MLOps Maturity & Career Growth (Andrey Shtylenko)", 
                  "Optimize Decisions with ML: Prescriptive & Robust Optimization for Supply Chain and Pricing (Dan Becker)", 
                  "Build & Scale Data Products for AI: Roadmaps, MLOps, Customer Research & Metrics (Greg Coquillo)", 
                  "Cracking the Code: Machine Learning Made Understandable (Christoph Molnar)", # not available
                  "Knowledge Graphs and LLMs Across Academia and Industry (Anahita Pakiman)", # not available 
                  "Trends in AI Infrastructure (Andrey Cheptsov)", # not available (because new?)
                  "Trends in Data Engineering (Adrian Brudaru)", # not available (because new?)
                  "From Human-in-the-Loop to Agent-in-the-Loop: A Practical Transition Guide (Ertugrul Mutlu)", # not available because new
                  "Responsible & Explainable AI: Practical Guide to Bias Detection, Fairness & Governance (Supreet Kaur)", 
                  "Master Industrial Data: Synthetic Tabular Data, Small-Data Modeling, Sensors & MLOps (Rosona Eldred)",
                  "Building and Scaling Data Science Practice in Industrial Enterprises: AI Adoption, MLOps Maturity & Career Growth (Andrey Shtylenko)",
                  "Data Governance & Data Access Management: Access Controls, Data Catalogs & Access-as-Code (Bart Vandekerckhove)",
                  "From Black-Box Systems to Augmented Decision-Making (Anusha Akkina)", 
                  "AI in Industry: Trust, Return on Investment and Future (Maria Sukhareva)", 
                  "MLOps in Finance: Regulated Deployment, CI/CD and Model Governance (Nemanja Radojkovic)"
                 ]

download_github_podcast_transcripts_for_titles(episode_titles)

In [11]:
#urls = ["https://www.youtube.com/watch?v=LBuGzyOkx7c&list=PL3MmuxUbc_hK60wsCyvrEK2RjQsUi4Oa_&index=54",
#        "https://www.youtube.com/watch?v=YncdlUscUOo&list=PL3MmuxUbc_hK60wsCyvrEK2RjQsUi4Oa_&index=41", 
#        "https://www.youtube.com/watch?v=RR6xaYqKJ3o&list=PL3MmuxUbc_hK60wsCyvrEK2RjQsUi4Oa_&index=36", 
#        "https://www.youtube.com/watch?v=1aMuynlLM3o&list=PL3MmuxUbc_hK60wsCyvrEK2RjQsUi4Oa_&index=25", 
#        "https://www.youtube.com/watch?v=AlCFKbFIEM8&list=PL3MmuxUbc_hK60wsCyvrEK2RjQsUi4Oa_&index=23", 
#        "https://www.youtube.com/watch?v=HwCR59VuYn4&list=PL3MmuxUbc_hK60wsCyvrEK2RjQsUi4Oa_&index=1"
#       ]
#download_github_podcast_transcripts_for_urls(urls)