In [1]:
%pip install python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [2]:
from dotenv import load_dotenv

In [3]:
load_dotenv()

True

In [4]:
from openai import OpenAI

openai_client = OpenAI()

In [5]:
import io
from typing import Iterable, Callable
import zipfile
import traceback
from dataclasses import dataclass

import requests


@dataclass
class RawRepositoryFile:
    filename: str
    content: str


class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                repo_owner: str,
                repo_name: str,
                allowed_extensions: Iterable[str] | None = None,
                filename_filter: Callable[[str], bool] | None = None
        ):
        """
        Initialize the GitHub repository data reader.
        
        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.
        
        Returns:
            List of RawRepositoryFile objects for each processed file
            
        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.
        
        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.
        
        Args:
            filepath: The file path to check
            
        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.
        
        Args:
            filepath: The file path to extract extension from
            
        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'
        
        Args:
            filepath: The original filepath from the zip archive
            
        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]

In [24]:
def read_github_data():

    repo_owner = 'datatalksclub'
    repo_name = 'datatalksclub.github.io'
    

    allowed_extensions = {"md", "mdx"}

    def podcast_filter(filename: str) -> bool:
        return '_podcast' in filename
    reader = GithubRepositoryDataReader(
        repo_owner,
        repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter=podcast_filter
    )
    
    return reader.read()

In [25]:
!uv add python-frontmatter

[2mResolved [1m153 packages[0m [2min 3ms[0m[0m
[2mAudited [1m133 packages[0m [2min 91ms[0m[0m


In [26]:
github_data = read_github_data()

In [27]:
github_data[40]

RawRepositoryFile(filename='_podcast/s05e02-data-engineering-acronyms.md', content='---\ntitle: "Making Sense of Data Engineering Acronyms and Buzzwords"\nshort: "Making Sense of Data Engineering Acronyms and Buzzwords"\nguests: [nataliekwong]\n\nimage: images/podcast/s05e02-data-engineering-acronyms.jpg\n\nseason: 5\nepisode: 2\n\nids:\n  youtube: t9Z1S3OYnJU\n  anchor: Making-Sense-of-Data-Engineering-Acronyms-and-Buzzwords---Natalie-Kwong-e177303\n\nlinks:\n  youtube: https://www.youtube.com/watch?v=t9Z1S3OYnJU\n  anchor: https://anchor.fm/datatalksclub/episodes/Making-Sense-of-Data-Engineering-Acronyms-and-Buzzwords---Natalie-Kwong-e177303\n  spotify: https://open.spotify.com/episode/1AvtwdcAXGGjdJ7fl0Hsuw\n  apple: https://podcasts.apple.com/us/podcast/making-sense-of-data-engineering-acronyms-and/id1541710331?i=1000534990760\n\ntranscript:\n- line: This week we\'ll try to make sense of common engineering acronyms and buzzwords\n    with the help of our special guest today, Natali

In [34]:
import frontmatter

def parse_data(data_raw):
    data_parsed = []
    errors = []
    
    for f in data_raw:
        try:
            post = frontmatter.loads(f.content)
            data = post.to_dict()
            data['filename'] = f.filename
            data_parsed.append(data)
        except Exception as e:
            # Log the error with filename for debugging
            errors.append({
                'filename': f.filename,
                'error': str(e),
                'error_type': type(e).__name__
            })
            print(f"Error parsing {f.filename}: {type(e).__name__}: {e}")
            continue
    
    if errors:
        print(f"\n{len(errors)} file(s) failed to parse out of {len(data_raw)} total files")
        print("\nFirst few errors:")
        for err in errors[:5]:
            print(f"  - {err['filename']}")
            print(f"    {err['error_type']}: {err['error']}")
    
    print(f"\nSuccessfully parsed {len(data_parsed)} files")
    return data_parsed

In [35]:
parsed_data = parse_data(github_data)

Error parsing _podcast/_template.md: ConstructorError: while constructing a mapping
  in "<unicode string>", line 6, column 8
found unhashable key
  in "<unicode string>", line 6, column 9

1 file(s) failed to parse out of 185 total files

First few errors:
  - _podcast/_template.md
    ConstructorError: while constructing a mapping
  in "<unicode string>", line 6, column 8
found unhashable key
  in "<unicode string>", line 6, column 9

Successfully parsed 184 files


In [38]:
num_podcasts = len(parsed_data)
print(f"Number of podcast documents: {num_podcasts}")

Number of podcast documents: 184


In [39]:
from typing import Any, Dict, Iterable, List


def sliding_window(
        seq: Iterable[Any],
        size: int,
        step: int
    ) -> List[Dict[str, Any]]:
    """
    Create overlapping chunks from a sequence using a sliding window approach.

    Args:
        seq: The input sequence (string or list) to be chunked.
        size (int): The size of each chunk/window.
        step (int): The step size between consecutive windows.

    Returns:
        list: A list of dictionaries, each containing:
            - 'start': The starting position of the chunk in the original sequence
            - 'content': The chunk content

    Raises:
        ValueError: If size or step are not positive integers.

    Example:
        >>> sliding_window("hello world", size=5, step=3)
        [{'start': 0, 'content': 'hello'}, {'start': 3, 'content': 'lo wo'}]
    """
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append({'start': i, 'content': batch})
        if i + size > n:
            break

    return result


def chunk_documents(
        documents: Iterable[Dict[str, str]],
        size: int = 2000,
        step: int = 1000,
        content_field_name: str = 'content'
) -> List[Dict[str, str]]:
    """
    Split a collection of documents into smaller chunks using sliding windows.

    Takes documents and breaks their content into overlapping chunks while preserving
    all other document metadata (filename, etc.) in each chunk.

    Args:
        documents: An iterable of document dictionaries. Each document must have a content field.
        size (int, optional): The maximum size of each chunk. Defaults to 2000.
        step (int, optional): The step size between chunks. Defaults to 1000.
        content_field_name (str, optional): The name of the field containing document content.
                                          Defaults to 'content'.

    Returns:
        list: A list of chunk dictionaries. Each chunk contains:
            - All original document fields except the content field
            - 'start': Starting position of the chunk in original content
            - 'content': The chunk content

    Example:
        >>> documents = [{'content': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, size=100, step=50)
        >>> # Or with custom content field:
        >>> documents = [{'text': 'long text...', 'filename': 'doc.txt'}]
        >>> chunks = chunk_documents(documents, content_field_name='text')
    """
    results = []

    for doc in documents:
        doc_copy = doc.copy()
        doc_content = doc_copy.pop(content_field_name)
        chunks = sliding_window(doc_content, size=size, step=step)
        for chunk in chunks:
            chunk.update(doc_copy)
        results.extend(chunks)

    return results

In [40]:
chunks = chunk_documents(parsed_data)

In [41]:
from minsearch import Index

In [42]:
index = Index(
        text_fields=["content", "filename", "title", "description"],
)
index.fit(chunks)

<minsearch.minsearch.Index at 0x255a3e070e0>

In [59]:
num_chunks = len(chunks)

In [60]:
print(f"Number of chunks: {num_chunks}")

Number of chunks: 162


In [43]:
search_results = index.search('how do I use this to find a job')

In [44]:
def search(query):
    return index.search(
        query=query,
        num_results=15
    )

In [46]:
question = 'how do I use this to find a job'

In [47]:
instructions = """
You're an assistant that helps with the documentation.
Answer the QUESTION based on the CONTEXT from the search engine of our documentation.

Use only the facts from the CONTEXT when answering the QUESTION.

When answering the question, provide the reference to the file with the source.
Use the filename field for that. The repo url is: https://github.com/evidentlyai/docs/
Include code examples when relevant. 
If the question is discussed in multiple documents, cite all of them.

Don't use markdown or any formatting in the output.
""".strip()

prompt_template = """
<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>
""".strip()


In [48]:
import json

In [49]:
def build_prompt(question, search_results):
    context = json.dumps(search_results)

    prompt = prompt_template.format(
        question=question,
        context=context
    ).strip()

    return prompt

In [50]:
from openai import OpenAI

openai_client = OpenAI()

def llm(user_prompt, instructions=None, model="gpt-4o-mini"):
    messages = []

    if instructions:
        messages.append({
            "role": "system",
            "content": instructions
        })

    messages.append({
        "role": "user",
        "content": user_prompt
    })

    response = openai_client.responses.create(
        model=model,
        input=messages
    )

    return response.output_text

In [53]:
def rag(query, top_k=3):  
    search_results = search(query)
    search_results = search_results[:top_k]  
    prompt = build_prompt(query, search_results)
    response = llm(prompt)
    return response

In [54]:
result = rag('how do I use this to find a job')

In [55]:
print(result)

To find a job using the insights from the provided content, follow these steps:

### 1. **Leverage Professional Networking:**
   - **Create or Update Your LinkedIn Profile:** Ensure it's professional and showcases your skills and experiences.
   - **Connect with Relevant People:** Reach out to individuals in your desired field. Use personalized messages to initiate conversations.
   - **Informational Interviews:** Schedule brief chats with industry professionals to gain insights into their roles and companies.

### 2. **Identify Your Career Goals:**
   - **Self-Assessment:** Reflect on what you want from your career. Consider your interests, strengths, and values.
   - **Career Research:** Investigate potential job roles and the skills required. Online tools or assessments like the O*Net Interest Profiler can help clarify your interests.

### 3. **Tailor Your Job Search:**
   - **Research Job Descriptions:** Look for roles that align with your skills and preferences. Pay attention to c

In [61]:
query = 'how do I use this to find a job'
search_results = search(query)

In [62]:
top_result = search_results[0]
print(f"Top result title: {top_result.get('title', 'N/A')}")
print(f"Filename: {top_result.get('filename', 'N/A')}")
print(f"Score: {top_result.get('score', 'N/A')}")

Top result title: Accelerating The Job Hunt for The Perfect Job in Tech
Filename: _podcast/s17e06-accelerating-job-hunt-for-perfect-job-in-tech.md
Score: N/A
