In [1]:

import io
from typing import Iterable, Callable
import zipfile
import traceback
from dataclasses import dataclass

import requests


@dataclass
class RawRepositoryFile:
    filename: str
    content: str


class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                repo_owner: str,
                repo_name: str,
                allowed_extensions: Iterable[str] | None = None,
                filename_filter: Callable[[str], bool] | None = None
        ):
        """
        Initialize the GitHub repository data reader.
        
        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.
        
        Returns:
            List of RawRepositoryFile objects for each processed file
            
        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.
        
        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.
        
        Args:
            filepath: The file path to check
            
        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.
        
        Args:
            filepath: The file path to extract extension from
            
        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'
        
        Args:
            filepath: The original filepath from the zip archive
            
        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]

In [2]:
# extract docs 

def read_github_data():
    allowed_extensions = {"md", "mdx"} 
    repo_owner = 'DataTalksClub'
    repo_name = 'datatalksclub.github.io'
    reader = GithubRepositoryDataReader(
        repo_owner,
        repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter=lambda filepath: (
            filepath.startswith("_podcast/") and 
            not filepath.split("/")[-1].startswith("_")  # xxclude files starting with "_"
        )
    )
    
    return reader.read()

In [3]:
github_data = read_github_data()

print(github_data[40].content)

---
title: "Introducing Data Science in Startups"
short: "Introducing Data Science in Startups"
guests: [mariannadiachuk]

image: images/podcast/s05e04-introducing-data-science-in-startups.jpg

season: 5
episode: 4

ids:
  youtube: KMSE9GkU2mE
  anchor: Introducing-Data-Science-in-Startups---Marianna-Diachuk-e17rc4i

links:
  youtube: https://youtube.com/watch?v=KMSE9GkU2mE
  anchor: https://anchor.fm/datatalksclub/episodes/Introducing-Data-Science-in-Startups---Marianna-Diachuk-e17rc4i
  spotify: https://open.spotify.com/episode/0kGFYX12RgkmZC2lMml6S4
  apple: https://podcasts.apple.com/us/podcast/introducing-data-science-in-startups-marianna-diachuk/id1541710331?i=1000536525162

transcript:
- header: "Marianna\u2019s background"
- line: We have a special guest today, Marianna. Marianna is a data scientist at Restream,
    and the data science lead and mentor in the local branch of Women Who Code community
    in Kiev. Before Restream, she worked at Data Robot and she also led the dat

In [4]:
files = read_github_data()
print(len(files))

183


In [6]:
# Parse to only include 'transcript'

import frontmatter

def parse_data(data_raw):
    data_parsed = []
    for f in data_raw:
        post = frontmatter.loads(f.content)
        data = post.to_dict()
        
        transcript_items = data.get('transcript', [])
        transcript_text = '\n\n'.join(
            item.get('line', '') for item in transcript_items if 'line' in item
        )
        
        parsed = {
            'transcript': transcript_text,
            'filename': f.filename,  # Keep the filename
            'title': data.get('title', ''),  # Get episode title if available
        }
        data_parsed.append(parsed)
    
    return data_parsed

parsed_data = parse_data(github_data)

In [7]:
# sliding window function
from typing import Any, Dict, Iterable, List

def sliding_window(
        seq: Iterable[Any],
        size: int,
        step: int
    ) -> List[Dict[str, Any]]:
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")
    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append({'start': i, 'content': batch})
        if i + size >= n:
            break
    return result


In [8]:
# Chunking

def chunk_by_paragraphs(documents, chunk_size=30, overlap=15):
    results = []
    
    for doc in documents:
        transcript = doc.get('transcript', '')
        paragraphs = [p.strip() for p in transcript.split('\n\n') if p.strip()]
        
        step = chunk_size - overlap
        chunks = sliding_window(paragraphs, size=chunk_size, step=step)
        
        for chunk in chunks:
            chunk_doc = {
                'content': '\n\n'.join(chunk['content']),
                'start_paragraph': chunk['start'],
                'filename': doc.get('filename', ''),  # Preserve filename
                'title': doc.get('title', ''),  # Preserve title
            }
            results.append(chunk_doc)
    
    return results

chunked_data = chunk_by_paragraphs(parsed_data, chunk_size=30, overlap=15)
print(f"Number of chunks: {len(chunked_data)}")

Number of chunks: 1479


In [9]:
# Indexing

from minsearch import Index

index = Index(
    text_fields=["content", "title", "filename"],
)
index.fit(chunked_data)

<minsearch.minsearch.Index at 0x7b3084719ac0>

In [10]:
search_results = index.search('how do I make money with AI?')

In [11]:
search_results[0]

{'content': "This week, we'll talk about volunteering and open source work. We have a special guest today, Sara. Sara is a Google Developer expert in machine learning, a Google PhD fellow, and also a co-founder of AI Wonder Girls. She likes to demystify AI to empower individuals with tools and mindsets that require building solutions that matter to the community and humanity.\n\nWe met with Sara in October, I think, at a conference in Porto. It was an amazing conference. We had a very nice chat. Sara was talking about what she does and I thought “She would be an amazing guest.” And here we are 3, 4, 5 months after that, finally. [chuckles] So, welcome to the interview.\n\nThank you.\n\nThe questions for today's interview were prepared by Johanna Bayer. As always, thanks, Johanna, for your help. Before we start – before we go into our main topic of open source work and volunteering – let's start with your background. Can you tell us about your career journey so far?\n\nYeah, sure. I got