In [1]:
import io
from typing import Iterable, Callable, Optional
import zipfile
import traceback
from dataclasses import dataclass
from pathlib import Path
import requests


@dataclass
class RawRepositoryFile:
    filename: str   # Pfad im Repo (z.B. "_podcast/foo.md")
    content: str    # Dateiinhalt (UTF-8)


class GithubRepositoryDataReader:
    """
    Downloads and parses text/code files from a GitHub repository zip (single branch).
    """

    def __init__(
        self,
        repo_owner: str,
        repo_name: str,
        branch: str = "main",
        allowed_extensions: Optional[Iterable[str]] = None,
        filename_filter: Optional[Callable[[str], bool]] = None,
        root_path: Optional[str] = None,  # z.B. "_podcast"
        strip: bool = True,               # .strip() auf content
        timeout: int = 60,
    ):
        # ZIP-Download-URL (codeload ist schnell & ohne API-Limits)
        self.url = f"https://codeload.github.com/{repo_owner}/{repo_name}/zip/refs/heads/{branch}"
        self.repo_prefix = f"{repo_name}-{branch}/"  # erster Ordner im ZIP
        self.branch = branch
        self.strip = strip
        self.timeout = timeout

        self.allowed_extensions = set(e.lower() for e in allowed_extensions) if allowed_extensions else None
        self.filename_filter = filename_filter or (lambda p: True)

        # root_path: auf gewünschten Ordner einschränken
        self.root_path = root_path.strip("/\\") + "/" if root_path else None

    def read(self) -> list[RawRepositoryFile]:
        resp = requests.get(self.url, timeout=self.timeout)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
            return self._extract_files(zf)

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        out: list[RawRepositoryFile] = []

        for info in zf.infolist():
            # Skip Verzeichnisse
            if info.is_dir():
                continue

            raw_path = info.filename  # z.B. "datatalksclub.github.io-main/_podcast/foo.md"
            # top-level Ordner entfernen
            if raw_path.startswith(self.repo_prefix):
                filepath = raw_path[len(self.repo_prefix):]
            else:
                filepath = raw_path

            # Falls auf Ordner eingeschränkt: nur darunter
            if self.root_path and not filepath.startswith(self.root_path):
                continue

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(info) as f:
                    content = f.read().decode("utf-8", errors="ignore")
                    if self.strip and content is not None:
                        content = content.strip()
                    out.append(RawRepositoryFile(filename=filepath, content=content))
            except Exception as e:
                print(f"Error processing {info.filename}: {e}")
                traceback.print_exc()
                continue

        return out

    def _should_skip_file(self, filepath: str) -> bool:
        # versteckte Dateien
        name = Path(filepath).name
        if name.startswith("."):
            return True

        # Extension-Filter
        if self.allowed_extensions is not None:
            ext = Path(filepath).suffix.lower().lstrip(".")
            if ext not in self.allowed_extensions:
                return True

        # Custom-Filter
        if not self.filename_filter(filepath):
            return True

        return False


def read_github_data():
    reader = GithubRepositoryDataReader(
        repo_owner="DataTalksClub",
        repo_name="datatalksclub.github.io",
        branch="main",
        allowed_extensions={"md", "mdx"},
        root_path="_podcast",                          # <<< nur dieser Ordner
    )
    return reader.read()

In [2]:
data_raw = read_github_data()
print(f"Downloaded {len(data_raw)} files")

Downloaded 185 files


In [3]:
data_raw=data_raw[2:]

In [4]:
import frontmatter

In [5]:
def parse_data(data_raw):
    data_parsed=[]
    for f in data_raw:
        post=frontmatter.loads(f.content)
        data=post.to_dict()
        data['filename'] = f.filename
        data_parsed.append(data)
    return data_parsed

In [6]:
parsed_data = parse_data(data_raw)

In [7]:
len(parsed_data)

183

In [9]:
from typing import List, Dict, Any, Iterable

def chunk_podcast_transcript(
    docs: Iterable[Dict[str, Any]],
    size: int = 30,
    step: int = 15,
) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    for i, d in enumerate(docs):
        items = [x for x in d.get("transcript", []) if isinstance(x, dict) and "line" in x]
        lines = [f"{x.get('who', '') + ': ' if x.get('who') else ''}{x['line']}" for x in items]
        n = len(lines)
        j = 0
        base_id = (d.get("ids", {}) or {}).get("youtube") or f"s{d.get('season','?')}e{d.get('episode','?')}"
        keep = {k: d.get(k) for k in ("title", "short", "season", "episode", "guests", "ids", "links", "image") if k in d}

        while j < n:
            start, end = j, min(j + size, n)
            slice_items = items[start:end]
            out.append({
                **keep,
                "chunk_index": len(out),                         # einfach & stabil genug
                "start_line": start,
                "end_line": end - 1,
                "start_sec": slice_items[0].get("sec") if slice_items else None,
                "end_sec": slice_items[-1].get("sec") if slice_items else None,
                "start_time": slice_items[0].get("time") if slice_items else None,
                "end_time": slice_items[-1].get("time") if slice_items else None,
                "content": "\n".join(lines[start:end]),
                "chunk_id": f"{base_id}__{start}",
            })
            if end == n: break
            j += step
    return out


In [10]:
chunks = chunk_podcast_transcript(parsed_data, size=30, step=15)


In [11]:
len(chunks)

1479

In [12]:
from minsearch import Index

In [13]:
index = Index(
    text_fields=["content", "title", "short"],
    #keyword_fields=["filename", "season", "episode", "guests"],
)
index.fit(chunks)


<minsearch.minsearch.Index at 0x7e87ce7a4680>

In [14]:
search_results = index.search("How do I make money with AI?")

In [15]:
search_results

[{'title': 'Make an Impact Through Volunteering Open Source Work',
  'short': 'Make an Impact Through Volunteering Open Source Work',
  'season': 17,
  'episode': 7,
  'guests': ['saraelateif'],
  'ids': {'anchor': 'atatalksclub/episodes/Make-an-Impact-Through-Volunteering-Open-Source-Work---Sara-EL-ATEIF-e2g4dan',
   'youtube': 'aHdaIwOEI8Q'},
  'links': {'anchor': 'https://podcasters.spotify.com/pod/show/datatalksclub/episodes/Make-an-Impact-Through-Volunteering-Open-Source-Work---Sara-EL-ATEIF-e2g4dan',
   'apple': 'https://podcasts.apple.com/us/podcast/make-an-impact-through-volunteering-open-source-work/id1541710331?i=1000646627892',
   'spotify': 'https://open.spotify.com/episode/7tZSSgv1yAlnoMyB4ggQmb?si=AqDaME2QS26usoZjOEWNtQ',
   'youtube': 'https://www.youtube.com/watch?v=aHdaIwOEI8Q'},
  'image': 'images/podcast/s17e07-make-impact-through-volunteering-open-source-work.jpg',
  'chunk_index': 1195,
  'start_line': 0,
  'end_line': 29,
  'start_sec': 103,
  'end_sec': 849,
  's