# Homework from week one

# Question 1: What would you like to work on? Describe your idea in a few sentences.


An AI agent that strengthens public accountability by analyzing open government data on procurement, budgets, and execution. It ingests official record associated with public contracts, summarizes key facts and flags potential risks (e.g., unusual contract modifications or under-execution). The goal is to turn dispersed public data into transparent and verifiable insights.

# Question 2: What should your AI system do? How do you envision it?

The system automatically collects, normalizes, and analyzes public contract and budget data to monitor execution and detect anomalies (e.g. concentration by supplier, cumulative modifications, infra-execution vs. plan). It provides evidence-based summaries with source citations and keeps an audit trail of every answer.

The interface combines an interactive AI chatbot for natural-language queries with downloadable reports, ensuring findings are transparent, traceable, and reproducible.







# Question 3: Does it need any external data? If you answer "no", are you sure? Maybe it'll benefit from it?

Yes. Primary sources include the Public Contracting Platform (contract notices, awards, modifications).  Complementary sources—such as official gazettes, parliamentary/hearing transcripts, and audit or fiscal-oversight reports—provide context and cross-verification. 


# Question 4: How many records do we have on the podcast 



In [2]:

import io
from typing import Iterable, Callable, List, Dict, Any
import frontmatter
import zipfile
import traceback
from dataclasses import dataclass
import requests


# Get files from GitHub repo -----------------------------------------------

@dataclass
class RawRepositoryFile:
    filename: str
    content: str


class GithubRepositoryDataReader:
    """
    Downloads and parses markdown and code files from a GitHub repository.
    """

    def __init__(self,
                repo_owner: str,
                repo_name: str,
                allowed_extensions: Iterable[str] | None = None,
                filename_filter: Callable[[str], bool] | None = None
        ):
        """
        Initialize the GitHub repository data reader.
        
        Args:
            repo_owner: The owner/organization of the GitHub repository
            repo_name: The name of the GitHub repository
            allowed_extensions: Optional set of file extensions to include
                    (e.g., {"md", "py"}). If not provided, all file types are included
            filename_filter: Optional callable to filter files by their path
        """
        prefix = "https://codeload.github.com"
        self.url = (
            f"{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main"
        )

        if allowed_extensions is not None:
            self.allowed_extensions = {ext.lower() for ext in allowed_extensions}

        if filename_filter is None:
            self.filename_filter = lambda filepath: True
        else:
            self.filename_filter = filename_filter

    def read(self) -> list[RawRepositoryFile]:
        """
        Download and extract files from the GitHub repository.
        
        Returns:
            List of RawRepositoryFile objects for each processed file
            
        Raises:
            Exception: If the repository download fails
        """
        resp = requests.get(self.url)
        if resp.status_code != 200:
            raise Exception(f"Failed to download repository: {resp.status_code}")

        zf = zipfile.ZipFile(io.BytesIO(resp.content))
        repository_data = self._extract_files(zf)
        zf.close()

        return repository_data

    def _extract_files(self, zf: zipfile.ZipFile) -> list[RawRepositoryFile]:
        """
        Extract and process files from the zip archive.
        
        Args:
            zf: ZipFile object containing the repository data

        Returns:
            List of RawRepositoryFile objects for each processed file
        """
        data = []

        for file_info in zf.infolist():
            filepath = self._normalize_filepath(file_info.filename)

            if self._should_skip_file(filepath):
                continue

            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="ignore")
                    if content is not None:
                        content = content.strip()

                    file = RawRepositoryFile(
                        filename=filepath,
                        content=content
                    )
                    data.append(file)

            except Exception as e:
                print(f"Error processing {file_info.filename}: {e}")
                traceback.print_exc()
                continue

        return data

    def _should_skip_file(self, filepath: str) -> bool:
        """
        Determine whether a file should be skipped during processing.
        
        Args:
            filepath: The file path to check
            
        Returns:
            True if the file should be skipped, False otherwise
        """
        filepath = filepath.lower()

        # directory
        if filepath.endswith("/"):
            return True

        # hidden file
        filename = filepath.split("/")[-1]
        if filename.startswith("."):
            return True

        if self.allowed_extensions:
            ext = self._get_extension(filepath)
            if ext not in self.allowed_extensions:
                return True

        if not self.filename_filter(filepath):
            return True

        return False

    def _get_extension(self, filepath: str) -> str:
        """
        Extract the file extension from a filepath.
        
        Args:
            filepath: The file path to extract extension from
            
        Returns:
            The file extension (without dot) or empty string if no extension
        """
        filename = filepath.lower().split("/")[-1]
        if "." in filename:
            return filename.rsplit(".", maxsplit=1)[-1]
        else:
            return ""

    def _normalize_filepath(self, filepath: str) -> str:
        """
        Removes the top-level directory from the file path inside the zip archive.
        'repo-main/path/to/file.py' -> 'path/to/file.py'
        
        Args:
            filepath: The original filepath from the zip archive
            
        Returns:
            The normalized filepath with top-level directory removed
        """
        parts = filepath.split("/", maxsplit=1)
        if len(parts) > 1:
            return parts[1]
        else:
            return parts[0]
        

# Adapt read_github_data to accept repo_owner and repo_name

def read_github_data(repo_owner: str, repo_name: str, folder: str = "") -> List["RawRepositoryFile"]:
    """
    Reads files from a GitHub repository, optionally filtering by folder.

    Args:
        repo_owner: GitHub repository owner
        repo_name: GitHub repository name
        folder: Optional folder path to filter files (e.g., "_podcast")

    Returns:
        List of RawRepositoryFile objects
    """
    allowed_extensions = {"md", "mdx"}

    def folder_filter(filepath: str) -> bool:
        if folder:
            # Check if the filepath contains the folder
            return filepath.startswith(f"{folder}/") or f"/{folder}/" in filepath
        return True  # No folder filter, include all files

    reader = GithubRepositoryDataReader(
        repo_owner=repo_owner,
        repo_name=repo_name,
        allowed_extensions=allowed_extensions,
        filename_filter=folder_filter,
    )
    
    return reader.read()



def parse_data(data_raw: List["RawRepositoryFile"]) -> List[Dict[str, Any]]:
    """
    Parses a list of RawRepositoryFile objects extracting frontmatter.

    Args:
        data_raw: List of RawRepositoryFile objects from GitHub

    Returns:
        List of dictionaries containing frontmatter data and filename
    """
    data_parsed: List[Dict[str, Any]] = []

    for f in data_raw:
        try:
            post = frontmatter.loads(f.content)
            data = post.to_dict()
        except Exception:
            # If no frontmatter or parse error, store empty dict
            data = {}
        
        data['filename'] = f.filename
        data_parsed.append(data)

    return data_parsed

In [None]:
# Get the podcast data from DataTalksClub repo

podcast_data = read_github_data("DataTalksClub", "datatalksclub.github.io", '_podcast')
display(podcast_data[:2])
display(f'There are {len(podcast_data)} records in the podcast data.')

[RawRepositoryFile(filename='_podcast/_s12e08.md', content='---\nepisode: 8\nguests:\n- jekaterinakokatjuhha\nids:\n  anchor: The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim\n  youtube: FRi0SUtxdMw\nimage: images/podcast/s12e08-journey-of-data-generalist-from-bioinformatics-to-freelancing.jpg\nlinks:\n  anchor: https://anchor.fm/datatalksclub/episodes/The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim\n  apple: https://podcasts.apple.com/us/podcast/the-journey-of-a-data-generalist-from/id1541710331?i=1000599125044\n  spotify: https://open.spotify.com/episode/5fB185hGlGYQmdk0kbIsPv?si=YtnsaYNzTc-fl7emZ2IjEA\n  youtube: https://www.youtube.com/watch?v=FRi0SUtxdMw\nseason: 12\nshort: \'The Journey of a Data Generalist: From Bioinformatics to Freelancing\'\ntitle: \'The Journey of a Data Generalist: From Bioinformatics to Freelancing\'\ntranscript:\n- line: This week we\'ll talk about being

'There are 185 records in the podcast data.'

In [None]:
# Parse the data

podcast_parsed = parse_data(podcast_data)

In [5]:
podcast_parsed[:1]

[{'episode': 8,
  'guests': ['jekaterinakokatjuhha'],
  'ids': {'anchor': 'The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim',
   'youtube': 'FRi0SUtxdMw'},
  'image': 'images/podcast/s12e08-journey-of-data-generalist-from-bioinformatics-to-freelancing.jpg',
  'links': {'anchor': 'https://anchor.fm/datatalksclub/episodes/The-Journey-of-a-Data-Generalist-From-Bioinformatics-to-Freelancing---Jekaterina-Kokatjuhha-e1upvim',
   'apple': 'https://podcasts.apple.com/us/podcast/the-journey-of-a-data-generalist-from/id1541710331?i=1000599125044',
   'spotify': 'https://open.spotify.com/episode/5fB185hGlGYQmdk0kbIsPv?si=YtnsaYNzTc-fl7emZ2IjEA',
   'youtube': 'https://www.youtube.com/watch?v=FRi0SUtxdMw'},
  'season': 12,
  'short': 'The Journey of a Data Generalist: From Bioinformatics to Freelancing',
  'title': 'The Journey of a Data Generalist: From Bioinformatics to Freelancing',
  'transcript': [{'line': "This week we'll talk about being a 

In [6]:
lines_lst = []

for episode in podcast_parsed:
    for entry in episode.get('transcript', []):
        if 'line' in entry:  # skip headers
           line = entry['line']
           lines_lst.append(line) 

In [7]:
len(lines_lst)

23665

# Question 5: Let's do chunk size 30 and overlap 15. How many chunks do you have in the result?

### Index the data

- Chunk and then index the data

In [65]:
def sliding_window(seq, size, step):
    """Create overlapping chunks using sliding window approach."""
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        batch = seq[i:i+size]
        result.append(batch)
        if i + size >= n:
            break

    return result

In [None]:

# Get the chunks with other metadata
chunks = []
for ep in podcast_parsed:
    # get all text lines
    lines = [t["line"] for t in ep.get("transcript", []) if "line" in t]
    # make text chunks (30 lines with 15 overlap)
    for i, chunk in enumerate(sliding_window(lines, size=30, step=15)):
        # Add metadata
        chunks.append({
            "id": f"{ep['episode']}-{i}",
            "episode": ep["episode"],
            "season": ep["season"],
            "title": ep["title"],
            "content": " ".join(chunk)
        })

In [83]:


print(f"Total chunks: {len(chunks)}")

Total chunks: 1487


In [69]:
from minsearch import Index

index = Index(
    text_fields=["content", "title"],
    keyword_fields=["episode", "season"]
)
index.fit(chunks)

<minsearch.minsearch.Index at 0x176ba4850>

In [None]:
# Define a search function

def search(question: str, num_results: int = 15) -> List[Dict[str, Any]]:
    """
    Search the podcast index for relevant chunks based on the question.

    Args:
        question: The search query
        num_results: Number of top results to return

    Returns:
        List of dictionaries containing search results
    """
    results = index.search(
        question, 
        boost_dict={"title": 1.5, "content": 1.5},
        num_results=num_results
    )
    return results

search(question="how do I make money with AI?", num_results=15)


[{'id': '7-0',
  'episode': 7,
  'season': 17,
  'title': 'Make an Impact Through Volunteering Open Source Work',
  'content': "This week, we'll talk about volunteering and open source work. We have a special guest today, Sara. Sara is a Google Developer expert in machine learning, a Google PhD fellow, and also a co-founder of AI Wonder Girls. She likes to demystify AI to empower individuals with tools and mindsets that require building solutions that matter to the community and humanity. We met with Sara in October, I think, at a conference in Porto. It was an amazing conference. We had a very nice chat. Sara was talking about what she does and I thought “She would be an amazing guest.” And here we are 3, 4, 5 months after that, finally. [chuckles] So, welcome to the interview. Thank you. The questions for today's interview were prepared by Johanna Bayer. As always, thanks, Johanna, for your help. Before we start – before we go into our main topic of open source work and volunteering 

### Create the RAG 

Combines `search`, `build_prompt` and `llm`

In [97]:
import json

instructions = """
You're an assistant that helps users explore podcast documentation.
Answer the QUESTION based on the CONTEXT from the podcast search engine results.

Use only the information contained in the CONTEXT when answering the QUESTION.

When providing the answer, include the reference to the source file using the filename field.
The repository URL is: https://github.com/DataTalksClub/datatalksclub.github.io/tree/main/_podcast

If the search results contain multiple relevant episodes, list all of them in order.
If code snippets or examples are mentioned in the episode description, include them.

Don't use markdown or any special formatting in your response.
""".strip()

prompt_template = """
<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>
""".strip()

def build_prompt(question, search_results):
    context = json.dumps(search_results, ensure_ascii=False, indent=2)
    prompt = prompt_template.format(
        question=question,
        context=context
    ).strip()
    return prompt

In [91]:
from openai import OpenAI

openai_client = OpenAI()

def llm(user_prompt, instructions=None, model="gpt-4o-mini"):
    messages = []

    if instructions:
        messages.append({
            "role": "system",
            "content": instructions
        })

    messages.append({
        "role": "user",
        "content": user_prompt
    })

    response = openai_client.responses.create(
        model=model,
        input=messages
    )

    return response.output_text

In [92]:
def rag(query):
    search_results = search(query)
    prompt = build_prompt(query, search_results)
    response = llm(prompt)
    return response

In [99]:
result = rag("What's the first episode in the results for 'how do I make money with AI?'")
print(result)

The first episode in the results for "how do I make money with AI?" is **Episode 4**, titled "The Good, the Bad and the Ugly of GPT."
