In [1]:
import io
import zipfile
import traceback

from dataclasses import dataclass
from typing import Iterable, Callable, Any, Dict, List, Literal, Optional

import requests
import frontmatter
from minsearch import Index

In [2]:
## Test LLM in virtual environment
#from openai import OpenAI
#openai_client = OpenAI()
#response = openai_client.responses.create(
#    model="gpt-5",
#    input="Write a short bedtime story about a unicorn."
#)
#
#print(response.output_text)

In [3]:
# Code copied from https://github.com/alexeygrigorev/ai-data-pipelines/blob/main/github_docs/github.py
@dataclass
class RawRepositoryFile:
    filename: str
    content: str

class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                repo_owner: str,
                repo_name: str,
                allowed_extensions: Iterable[str] | None = None,
                filename_filter: Callable[[str], bool] | None = None
        ):
        """
        Initialize the GitHub repository data reader.
        
        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.
        
        Returns:
            List of RawRepositoryFile objects for each processed file
            
        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.
        
        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.
        
        Args:
            filepath: The file path to check
            
        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.
        
        Args:
            filepath: The file path to extract extension from
            
        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'
        
        Args:
            filepath: The original filepath from the zip archive
            
        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]

In [4]:
def sliding_window(
        seq: Iterable[Any],
        size: int,
        step: int
    ) -> List[Dict[str, Any]]:
    """
    Create overlapping chunks from a sequence using a sliding window approach.

    Args:
        seq: The input sequence (string or list) to be chunked.
        size (int): The size of each chunk/window.
        step (int): The step size between consecutive windows.

    Returns:
        list: A list of dictionaries, each containing:
            - 'start': The starting position of the chunk in the original sequence
            - 'content': The chunk content

    Raises:
        ValueError: If size or step are not positive integers.

    Example:
        >>> sliding_window("hello world", size=5, step=3)
        [{'start': 0, 'content': 'hello'}, {'start': 3, 'content': 'lo wo'}]
    """
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append({'start': i, 'content': batch})
        if i + size > n:
            break

    return result

In [5]:
def extract_paragraphs_from_raw(raw_file):
    post = frontmatter.loads(raw_file.content)
    paras = []
    for e in post.get('transcript', []):
        if 'line' in e:
            paras.append(e['line'].replace('\n', ' ').strip())
        elif 'header' in e:
            paras.append(f"## {e['header']}".strip())
    return [p for p in paras if p]

def extract_title_from_raw(raw) -> Optional[str]:
    """
    Return the episode title from a RawRepositoryFile (YAML front-matter 'title').
    """
    try:
        post = frontmatter.loads(getattr(raw, "content", "") or "")
    except Exception:
        return None

    t = post.get("title")
    if isinstance(t, (str, int, float)):
        s = str(t).strip()
        return s or None
    return None

In [6]:
def chunk_documents(
        documents: Iterable[Dict[str, Any]],
        size: int = 2000,
        step: int = 1000,
        content_field_name: str = 'content',
        unit: Literal['chars', 'paragraphs'] = 'chars',
        joiner: str | None = None  # e.g. "\n\n" to join paragraph lists into text
) -> List[Dict[str, Any]]:
    """
    Split documents into overlapping chunks using sliding windows.
    - If unit='chars', treat content as a string and window by characters.
    - If unit='paragraphs', treat content as a list and window by paragraph count.
      If `joiner` is provided, join the list into a string for the chunk content.
    """
    results: List[Dict[str, Any]] = []

    for doc in documents:
        doc_copy = doc.copy()
        content = doc_copy.pop(content_field_name)

        # Allow both strings and lists; sliding_window already supports Iterables
        chunks = sliding_window(content, size=size, step=step)

        for ch in chunks:
            out = {'start': ch['start']}
            if unit == 'paragraphs' and isinstance(ch['content'], list):
                out['num_paragraphs'] = len(ch['content'])
                out['content'] = (
                    joiner.join(ch['content']) if joiner is not None else ch['content']
                )
                out['unit'] = 'paragraphs'
            else:
                # char-based or already a string
                out['content'] = ch['content']
                out['unit'] = 'chars'

            out.update(doc_copy)  # add original metadata except the content field
            results.append(out)

    return results

In [7]:
def _has_transcript(content: str) -> bool:
    """
    Returns True if the YAML front-matter has a non-empty 'transcript' list
    with at least one dict containing 'line' or 'header'.
    """
    try:
        post = frontmatter.loads(content or "")
    except Exception:
        return False

    tr = post.get('transcript')
    if not isinstance(tr, list) or not tr:
        return False

    return any(isinstance(e, dict) and ('line' in e or 'header' in e) for e in tr)

def read_github_data(only_with_transcripts: bool = True):
    repo_owner = 'DataTalksClub'
    repo_name = 'datatalksclub.github.io'

    allowed_extensions = {"md", "mdx"}

    reader = GithubRepositoryDataReader(
        repo_owner,
        repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter=lambda p: p.startswith('_podcast/')  # select files in _podcast directory
    )

    files = reader.read()

    if not only_with_transcripts:
        return files

    # Keep only files that have a transcript in YAML front-matter
    return [f for f in files if _has_transcript(f.content)]

In [8]:
github_data = read_github_data()

#### Test chunking workflow on one podcast file

In [9]:
# Get one file and extract paragraph list (strings)
raw = github_data[6]
paras = extract_paragraphs_from_raw(raw)  # -> List[str]
title = extract_title_from_raw(raw)

# Wrap as a document dict with relevant metadata
docs = [{
    'content': paras, # podcast transcript in paragraph format
    'filename': raw.filename, # podcast filename
    'title': title, # podcast title
}]

# Chunk by paragraphs: 30 size, 15 step, join with blank lines
chunks = chunk_documents(
    docs,
    size=30,
    step=15,
    content_field_name='content',
    unit='paragraphs',
    joiner="\n\n"
)

In [10]:
title

'New Roles and Key Skills to Monetize Machine Learning'

In [11]:
chunks

[{'start': 0,
  'num_paragraphs': 30,
  'content': "Good morning. Now it's 7 AM for you or?\n\nYeah, just after 7 AM.\n\nThanks a lot for coming at such an early hour.\n\nYou know, for me this actually isn't that early. I work a couple of different time zones. This isn't too strange for me.\n\nWhat time do you usually wake up?\n\nUsually about 4:35. Somewhere in that half hour range. I've got East Coast clients, I've got UK clients. I've got one client in Germany, but we don't really get on the phone too often. I usually interact with their US division. I've got one client now that's in South Korea… I am global.\n\nThanks for finding the time in your schedule to talk to us and share your knowledge. I think we can start. Next time, I'll think if I want to do another five-hour-long conference…\n\nIt’s like a marathon if you're the host, and you're the only one.\n\nI’ll think twice next time. We still have 55 people who managed to stay till the last hour. These are probably people who rea

#### Questions for Homework1 on all podcast files

In [12]:
# Number of podcast transcripts available at https://github.com/DataTalksClub/datatalksclub.github.io/tree/main/_podcast
# I read in all files with an .md extension, and then I kept only those whose YAML front-matter contained a transcript
len(github_data)

174

In [13]:
# Build docs for all podcast files (skip ones with no transcript/body paragraphs)
docs = []
for raw in github_data:
    paras = extract_paragraphs_from_raw(raw)  # list of strings
    if not paras:
        continue  # nothing to chunk
    title = extract_title_from_raw(raw)
    docs.append({
        'content': paras,           # podcast transcript in paragraph format
        'filename': raw.filename,   # podcast filename
        'title': title,             # podcast title
    })

# Chunk all docs by paragraphs: 30 size, 15 step, join with blank lines
chunks = chunk_documents(docs, size=30, step=15, content_field_name='content',
                         unit='paragraphs', joiner="\n\n")

In [14]:
len(chunks)

1691

In [15]:
# Lexical search
index = Index(text_fields=['content'])
index.fit(chunks)

# Test search:
results = index.search('how do I make money with AI?', num_results=2)
print(results)

[{'start': 60, 'num_paragraphs': 30, 'content': 'At a previous company, we built an MLOps platform and used Spark for processing logs. It was a new project, and even though most developers had no Spark experience, it was still the easiest solution.\n\nWe’ve talked a lot about data engineering. Now, let’s talk about AI. How is data engineering connected to AI tools and LLMs?\n\n## The connection between data engineering and AI tools\n\nFirst, you can use AI in your pipeline, but it might not be the smartest idea because of hallucinations. You’ll have many bugs to fix. Another connection is fine-tuning models. You need a lot of data, and data engineers preprocess it. Even if you’re doing backend work, you’ll have logs to analyze. Data engineers are always needed, though the scale varies.\n\nI was also curious about your journey. How did you switch from data engineering to focusing more on AI? Was it something you liked, or was it something you needed to work on and learn? How did it happ

In [16]:
results[0]['title']

'Data Intensive AI'