In [60]:
import io
from typing import Iterable, Callable
import zipfile
import traceback
from dataclasses import dataclass

import requests


@dataclass
class RawRepositoryFile:
    filename: str
    content: str


class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                 repo_owner: str,
                 repo_name: str,
                 allowed_extensions: Iterable[str] | None = None,
                 filename_filter: Callable[[str], bool] | None = None
                 ):
        """
        Initialize the GitHub repository data reader.

        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.

        Returns:
            List of RawRepositoryFile objects for each processed file

        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.

        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.

        Args:
            filepath: The file path to check

        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.

        Args:
            filepath: The file path to extract extension from

        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'

        Args:
            filepath: The original filepath from the zip archive

        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]


In [61]:
def read_github_data():
    allowed_extensions = {"md"}

    repo_owner = 'DataTalksClub'
    repo_name = 'datatalksclub.github.io'

    def only_podcasts(filepath: str) -> bool:
        return "_podcast" in filepath and "_template.md" not in filepath

    reader = GithubRepositoryDataReader(
        repo_owner,
        repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter=only_podcasts
    )

    return reader.read()


In [62]:
data_raw = read_github_data()
print(f"Downloaded {len(data_raw)} files")

# Print all filenames in data_raw
for file in data_raw:
    print(file.filename)

Downloaded 188 files
_podcast/_s12e08.md
_podcast/s01e01-roles.md
_podcast/s01e02-processes.md
_podcast/s01e03-building-ds-team.md
_podcast/s01e04-standing-out-as-a-data-scientist.md
_podcast/s01e05-mentoring.md
_podcast/s02e01-writing.md
_podcast/s02e02-developer-advocacy.md
_podcast/s02e03-open-source.md
_podcast/s02e04-mlops.md
_podcast/s02e05-feature-stores.md
_podcast/s02e06-decision-optimization.md
_podcast/s02e07-abc-data-science.md
_podcast/s02e08-personal-branding.md
_podcast/s02e09-roles-skills-monetizing-ml.md
_podcast/s02e10-public-speaking.md
_podcast/s02e11-dataops.md
_podcast/s02e12-communities.md
_podcast/s03e01-from-pm-to-ds.md
_podcast/s03e02-from-analytics-to-data-science.md
_podcast/s03e03-data-observability.md
_podcast/s03e04-effective-communication-with-business.md
_podcast/s03e04-interviewing-300-data-scientists.md
_podcast/s03e06-from-physics-to-machine-learning.md
_podcast/s03e07-market-yourself.md
_podcast/s03e08-data-led-professional.md
_podcast/s03e09-what-d

In [63]:
data_raw[0]



RawRepositoryFile(filename='_podcast/_s12e08.md', content='---\nepisode: 8\nguests:\n- jekaterinakokatjuhha\nids:\n  anchor: The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim\n  youtube: FRi0SUtxdMw\nimage: images/podcast/s12e08-journey-of-data-generalist-from-bioinformatics-to-freelancing.jpg\nlinks:\n  anchor: https://anchor.fm/datatalksclub/episodes/The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim\n  apple: https://podcasts.apple.com/us/podcast/the-journey-of-a-data-generalist-from/id1541710331?i=1000599125044\n  spotify: https://open.spotify.com/episode/5fB185hGlGYQmdk0kbIsPv?si=YtnsaYNzTc-fl7emZ2IjEA\n  youtube: https://www.youtube.com/watch?v=FRi0SUtxdMw\nseason: 12\nshort: \'The Journey of a Data Generalist: From Bioinformatics to Freelancing\'\ntitle: \'The Journey of a Data Generalist: From Bioinformatics to Freelancing\'\ntranscript:\n- line: This week we\'ll talk about being 

In [64]:
"""
Document chunking utilities for splitting large documents into smaller, overlapping pieces.

This module provides functionality to break down documents into chunks using a sliding
window approach, which is useful for processing large texts in smaller, manageable pieces
while maintaining context through overlapping content.
"""

from typing import Any, Dict, Iterable, List


def sliding_window(
        seq: Iterable[Any],
        size: int,
        step: int
    ) -> List[Dict[str, Any]]:
    """
    Create overlapping chunks from a sequence using a sliding window approach.

    Args:
        seq: The input sequence (string or list) to be chunked.
        size (int): The size of each chunk/window.
        step (int): The step size between consecutive windows.

    Returns:
        list: A list of dictionaries, each containing:
            - 'start': The starting position of the chunk in the original sequence
            - 'content': The chunk content

    Raises:
        ValueError: If size or step are not positive integers.

    Example:
        >>> sliding_window("hello world", size=5, step=3)
        [{'start': 0, 'content': 'hello'}, {'start': 3, 'content': 'lo wo'}]
    """
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append({'start': i, 'content': batch})
        if i + size > n:
            break

    return result


def chunk_documents(
        documents: Iterable[Dict[str, str]],
        size: int = 2000,
        step: int = 1000,
        content_field_name: str = 'content'
) -> List[Dict[str, str]]:
    """
    Split a collection of documents into smaller chunks using sliding windows.

    Takes documents and breaks their content into overlapping chunks while preserving
    all other document metadata (filename, etc.) in each chunk.

    Args:
        documents: An iterable of document dictionaries. Each document must have a content field.
        size (int, optional): The maximum size of each chunk. Defaults to 2000.
        step (int, optional): The step size between chunks. Defaults to 1000.
        content_field_name (str, optional): The name of the field containing document content.
                                          Defaults to 'content'.

    Returns:
        list: A list of chunk dictionaries. Each chunk contains:
            - All original document fields except the content field
            - 'start': Starting position of the chunk in original content
            - 'content': The chunk content

    Example:
        >>> documents = [{'content': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, size=100, step=50)
        >>> # Or with custom content field:
        >>> documents = [{'text': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, content_field_name='text')
    """
    results = []

    for doc in documents:
        doc_copy = doc.copy()
        doc_content = doc_copy.pop(content_field_name)
        chunks = sliding_window(doc_content, size=size, step=step)
        for chunk in chunks:
            chunk.update(doc_copy)
        results.extend(chunks)

    return results

In [65]:
"""
Document indexing utilities for creating searchable indexes from document collections.

This module provides functionality to index documents using minsearch, with optional
chunking support for handling large documents.
"""

from minsearch import Index

def index_documents(documents, chunk: bool = False, chunking_params=None) -> Index:
    """
    Create a searchable index from a collection of documents.

    Args:
        documents: A collection of document dictionaries, each containing at least
                  'content' and 'filename' fields.
        chunk (bool, optional): Whether to chunk documents before indexing.
                               Defaults to False.
        chunking_params (dict, optional): Parameters for document chunking.
                                        Defaults to {'size': 2000, 'step': 1000}.
                                        Only used when chunk=True.

    Returns:
        Index: A fitted minsearch Index object ready for searching.

    Example:
        >>> docs = [{'content': 'Hello world', 'filename': 'doc1.txt'}]
        >>> index = index_documents(docs)
        >>> results = index.search('hello')
    """
    if chunk:
        if chunking_params is None:
            chunking_params = {'size': 2000, 'step': 1000}
        documents = chunk_documents(documents, **chunking_params)
    index = Index(
        text_fields=["content", "filename"],
    )

    print(f"Chunks Size: {len(documents)}")

    index.fit(documents)
    return index

In [66]:
!uv add python-frontmatter
!uv add rich

[2mResolved [1m154 packages[0m [2min 7ms[0m[0m
[2mAudited [1m135 packages[0m [2min 0.20ms[0m[0m
[2mResolved [1m154 packages[0m [2min 0.30ms[0m[0m
[2mAudited [1m135 packages[0m [2min 0.02ms[0m[0m


In [67]:
import frontmatter
from typing import List, Dict, Any
from rich.progress import track

def parse_data(data_raw: List[RawRepositoryFile]) -> List[Dict[str, Any]]:
    print("📄 [bold blue]Parsing documents...[/bold blue]")

    data_parsed = []
    for f in track(data_raw, description="Processing files..."):
        post = frontmatter.loads(f.content)
        data = post.to_dict()
        data['filename'] = f.filename
        data_parsed.append(data)

    return data_parsed

In [68]:
data = parse_data(data_raw)
index = index_documents(
    data,
    chunk=True,
    chunking_params={"size": 30, "step": 14},
)

Output()

📄 [bold blue]Parsing documents...[/bold blue]


Chunks Size: 11679


In [69]:
index.search(
    'how do I make money with AI?',
    num_results=1
)

[{'start': 224,
  'content': 'unch.ai/){:target="_blank"}\n* ',
  'episode': 7,
  'guests': ['saraelateif'],
  'ids': {'anchor': 'atatalksclub/episodes/Make-an-Impact-Through-Volunteering-Open-Source-Work---Sara-EL-ATEIF-e2g4dan',
   'youtube': 'aHdaIwOEI8Q'},
  'image': 'images/podcast/s17e07-make-impact-through-volunteering-open-source-work.jpg',
  'links': {'anchor': 'https://podcasters.spotify.com/pod/show/datatalksclub/episodes/Make-an-Impact-Through-Volunteering-Open-Source-Work---Sara-EL-ATEIF-e2g4dan',
   'apple': 'https://podcasts.apple.com/us/podcast/make-an-impact-through-volunteering-open-source-work/id1541710331?i=1000646627892',
   'spotify': 'https://open.spotify.com/episode/7tZSSgv1yAlnoMyB4ggQmb?si=AqDaME2QS26usoZjOEWNtQ',
   'youtube': 'https://www.youtube.com/watch?v=aHdaIwOEI8Q'},
  'season': 17,
  'short': 'Make an Impact Through Volunteering Open Source Work',
  'title': 'Make an Impact Through Volunteering Open Source Work',
  'transcript': [{'line': "This week, 

In [70]:
def search(query):
    return index.search(
        query=query,
        num_results=15
    )

In [71]:

instructions = """
You're an assistant that helps with the documentation.
Answer the QUESTION based on the CONTEXT from the search engine of our documentation.

Use only the facts from the CONTEXT when answering the QUESTION.

When answering the question, provide the reference to the file with the source.
Use the filename field for that.
The repo url is: https://github.com/evidentlyai/docs/

Include code examples when relevant.
If the question is discussed in multiple documents, cite all of them.

Don't use markdown or any formatting in the output.
""".strip()



In [72]:
import json
prompt_template = """
<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>
""".strip()

def build_prompt(question, search_results):
    context = json.dumps(search_results)

    prompt = prompt_template.format(
        question=question,
        context=context
    ).strip()

    return prompt


In [73]:
#Interact with LLM
from openai import OpenAI

openai_client = OpenAI()

def interact_with_llm(user_prompt, instructions=None, model="gpt-4o-mini"):
    messages = []

    if instructions:
        messages.append({
            "role": "system",
            "content": instructions
        })

    messages.append({
        "role": "user",
        "content": user_prompt
    })

    response = openai_client.responses.create(
        model=model,
        input=messages
    )

    return response.output_text

In [74]:
def ask_evidently(query):
    search_results = search(query)
    user_prompt = build_prompt(query,search_results)
    print(user_prompt)
    response = interact_with_llm(user_prompt,instructions)
    return response

In [76]:
result = ask_evidently('how do I make money with AI?')
print(result)

<QUESTION>
how do I make money with AI?
</QUESTION>

<CONTEXT>
[{"start": 224, "content": "unch.ai/){:target=\"_blank\"}\n* ", "episode": 7, "guests": ["saraelateif"], "ids": {"anchor": "atatalksclub/episodes/Make-an-Impact-Through-Volunteering-Open-Source-Work---Sara-EL-ATEIF-e2g4dan", "youtube": "aHdaIwOEI8Q"}, "image": "images/podcast/s17e07-make-impact-through-volunteering-open-source-work.jpg", "links": {"anchor": "https://podcasters.spotify.com/pod/show/datatalksclub/episodes/Make-an-Impact-Through-Volunteering-Open-Source-Work---Sara-EL-ATEIF-e2g4dan", "apple": "https://podcasts.apple.com/us/podcast/make-an-impact-through-volunteering-open-source-work/id1541710331?i=1000646627892", "spotify": "https://open.spotify.com/episode/7tZSSgv1yAlnoMyB4ggQmb?si=AqDaME2QS26usoZjOEWNtQ", "youtube": "https://www.youtube.com/watch?v=aHdaIwOEI8Q"}, "season": 17, "short": "Make an Impact Through Volunteering Open Source Work", "title": "Make an Impact Through Volunteering Open Source Work", "tr