In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
import os
import re
import json
import importlib.util
from typing import List
from pydantic import BaseModel, ValidationError
from openai import OpenAI

### Applied LLM Systems - Lab Exercise 5: Dynamic Schema Generation

## 🎯 Objective

Build a system that learns the structure of data by analyzing examples, then applies that learned structure to extract information from new text. This demonstrates adaptive schema generation based on content analysis.

## 📋 Task Description

You are given three text files:

* `example1.txt` - First example text about cities
* `example2.txt` - Second example text about cities
* `target.txt` - New text to extract information from

Your system should:

1. Analyze the two example texts to identify what information they commonly contain
2. Generate a Pydantic schema based on discovered common fields
3. Use that schema to extract structured data from the target text

## Step 1: Schema Discovery

Ask your LLM to analyze example1.txt and example2.txt to identify:

* What fields/attributes are present in both texts
* What data types each field should have
* Which fields should be required vs. optional

The LLM should output a description or actual Pydantic model definition.

## Step 2: Generate Pydantic Model

Either:

* Have the LLM generate the actual Python code for the Pydantic model, or
* Take the LLM's field descriptions and write the model yourself

Save this as `schema.py`

## Step 3: Extract from Target

Use the generated schema with structured output to extract information from `target.txt`.

Save the result as `extracted_data.json`


## Example inputs:

### example1.txt:
```Barcelona is located in Spain on the Mediterranean coast. It has a population of 1.6 million and is known for Sagrada Familia and Park Güell. The official languages are Spanish and Catalan. Summer temperatures average 28°C.```

### example2.txt:
```Amsterdam is the capital of the Netherlands with 872,000 inhabitants. Famous landmarks include the Anne Frank House and Van Gogh Museum. Dutch is the official language, and the average summer temperature is around 22°C.```

### target.txt:
```Lisbon is Portugal's coastal capital with a population of 505,000 people. Portuguese is the official language. The city is famous for Belém Tower and Jerónimos Monastery. Summer temperatures typically reach 27°C.```
# 🔧 Your Workflow

## Step 1: Schema Discovery

Ask your LLM to analyze example1.txt and example2.txt to identify:

* What fields/attributes are present in both texts
* What data types each field should have
* Which fields should be required vs. optional

The LLM should output a description or actual Pydantic model definition.

## Step 2: Generate Pydantic Model

Either:

* Have the LLM generate the actual Python code for the Pydantic model, or
* Take the LLM's field descriptions and write the model yourself

Save this as `schema.py`

## Step 3: Extract from Target

Use the generated schema with structured output to extract information from `target.txt`.

Save the result as `extracted_data.json`

In [3]:
openai_api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=openai_api_key)

task1_path = "assets/task1/"
input_path = task1_path + "in/"
output_path = task1_path + "out/"

example1_path = input_path + "example1.txt"
example2_path = input_path + "example2.txt"
target_path = input_path + "target.txt"
py_schema_path = output_path + "schema.py"
output_json_path = output_path + "extracted_data.json"


def read_text(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Missing file: {path}")
    with open(path, "r", encoding="utf-8") as f:
        return f.read()

def extract_code_block(text, language_hint="python"):
    pattern_lang = re.compile(rf"```{language_hint}\s*(.*?)```", re.DOTALL | re.IGNORECASE)
    m = pattern_lang.search(text)
    if m:
        return m.group(1).strip()

    pattern_any = re.compile(r"```\s*(.*?)```", re.DOTALL)
    m = pattern_any.search(text)
    if m:
        return m.group(1).strip()

    return None

def write_schema_py(code):
    with open(py_schema_path, "w", encoding="utf-8") as f:
        f.write(code)
    print(f"Wrote {py_schema_path}")

def import_pydantic_model(module_path):
    spec = importlib.util.spec_from_file_location("schema", module_path)
    if not spec or not spec.loader:
        raise RuntimeError("Could not load schema.py")
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)

    if hasattr(module, "CityInfo"):
        cls = getattr(module, "CityInfo")
        if isinstance(cls, type) and issubclass(cls, BaseModel):
            return cls

    for name, obj in module.__dict__.items():
        if isinstance(obj, type) and issubclass(obj, BaseModel) and obj is not BaseModel:
            return obj

    raise RuntimeError("No Pydantic BaseModel found in schema.py")

def get_json_schema_from_model(model_cls):
    try:
        return model_cls.model_json_schema()
    except Exception:
        return model_cls.schema()

def discover_schema_from_examples(ex1, ex2):
    system = (
        "You write clean Pydantic v2 models. "
        "Produce one BaseModel named CityInfo that captures only fields present in both example texts. "
        "Use simple types such as str, int, float, list[str]. "
        "Mark a field required only if it appears clearly in both examples. "
        "Return a single python fenced block with only the model code."
    )

    user = f"""
Two city example texts follow.

Example A:
{ex1}

Example B:
{ex2}

Task:
1) Identify fields present in both texts.
2) Propose types for each field.
3) Output a Pydantic v2 model named CityInfo with those fields and short docstrings.
Return the model inside one python fenced block. No extra talk.
"""

    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        temperature=0.2,
    )

    content = resp.choices[0].message.content or ""
    code = extract_code_block(content, "python")

    if not code:
        raise RuntimeError("The model did not return a python code block with the schema")
    return code

def extract_with_schema(model_cls, target_text):
    json_schema = get_json_schema_from_model(model_cls)

    system = (
        "You extract structured data in JSON. "
        "You must produce JSON that validates against the provided JSON schema. "
        "Only output the JSON."
    )

    user_payload = {
        "task": "Extract data from the given text following the JSON schema",
        "text": target_text,
        "json_schema": json_schema,
        "hints": [
            "Population is a number of people. Use int if the text gives a whole number, else use float.",
            "If languages are multiple, return a list of strings.",
            "Temperatures are in Celsius as numbers where possible.",
            "Landmarks should be a list of strings when present."
        ],
    }

    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": json.dumps(user_payload, ensure_ascii=False)},
        ],
        temperature=0.0,
        response_format={"type": "json_object"}
    )

    content = resp.choices[0].message.content or "{}"
    data = json.loads(content)

    try:
        obj = model_cls(**data)
    except ValidationError as ve:
        raise SystemExit(f"Validation failed.\n{ve}")
    return obj.model_dump()

ex1 = read_text(example1_path)
ex2 = read_text(example2_path)

code = discover_schema_from_examples(ex1, ex2)
write_schema_py(code)

model_cls = import_pydantic_model(py_schema_path)

target_text = read_text(target_path)
result = extract_with_schema(model_cls, target_text)

with open(output_json_path, "w", encoding="utf-8") as f:
    json.dump(result, f, indent=2, ensure_ascii=False)

print(f"Wrote {output_json_path}")

Wrote assets/task1/out/schema.py
Wrote assets/task1/out/extracted_data.json


# 📝 The Task: Analyzing a GitHub Repository

Your goal is to create a function, `investigate_github_repo(user_query: str)`, that can take a natural language query like "Tell me about the repository for the Python library 'requests'" and provide a detailed summary.

## Step 1: Define the Pydantic Tool Schema

Students must define a Pydantic model that represents the required arguments for the GitHub API call.
```python
GitHubRepoQuery(BaseModel):
    owner (str): The username of the repository owner (e.g., "psf").
    repo (str): The name of the repository (e.g., "requests").
```

## Step 2: Implement the Tool Generation Logic

Create a function, `get_api_args(user_query: str)`, which does the following:

* Uses a strong System Prompt to instruct the LLM to act as a GitHub Argument Generator.
* Uses the `GitHubRepoQuery` model with the OpenAI API's structured output feature to generate the correct `owner` and `repo` names from the `user_query`.
* The function should return a populated `GitHubRepoQuery` Pydantic object (e.g., `GitHubRepoQuery(owner="psf", repo="requests")`).

## Step 3: Implement the External Tool (The API Call)

Create a function, `call_github_api(query_args: GitHubRepoQuery)`, which simulates the external tool execution:

* Constructs the correct GitHub REST API URL: `https://api.github.com/repos/{owner}/{repo}`.
* Uses the `requests` library to make a `GET` request.
* Crucially: Filters the massive JSON response from GitHub to extract only the most relevant fields that the LLM will need to answer the final question (e.g., `name`, `description`, `stargazers_count`, `forks_count`, `language`, `open_issues_count`, `html_url`).
* Returns the filtered data (a smaller Python dictionary/JSON object).

## Step 4: Define the Final Answer Schema & Interpretation

To ensure the final output is also structured, define a final Pydantic model for the answer.
```python
RepoSummary(BaseModel):
    summary_title (str): A creative, descriptive title for the summary.
    key_stats (List[str]): A list of key facts about the repository (e.g., "It has 50k stars.").
    llm_analysis (str): A final paragraph summarizing the project's purpose and status based on the data.
    github_url (str): The direct link to the repository.
```
Create the final generation function, `get_final_summary(query: str, repo_data: dict)`:

* Uses a new strong System Prompt to instruct the LLM to act as an Expert Data Analyst.
* The User Prompt should be a combination of the original `user_query` and the retrieved `repo_data` (the filtered JSON from Step 3).
* Uses the `RepoSummary` model for the final structured output.

## Step 5: The Main Execution Flow

Combine all steps into the main `investigate_github_repo` function:
```python
def investigate_github_repo(user_query: str) -> RepoSummary:
    # 1. Generate API Arguments
    api_args = get_api_args(user_query)

    # 2. Call the External Tool (GitHub API)
    repo_data = call_github_api(api_args)

    # 3. Interpret the Results and Generate Final Answer
    final_summary = get_final_summary(user_query, repo_data)

    return final_summary

In [4]:
import requests
from pydantic import Field

In [5]:
openai_api_key = os.getenv("OPENAI_API_KEY")
openai_model = "gpt-4o-mini"
client = OpenAI(api_key = openai_api_key)

class GitHubRepoQuery(BaseModel):
    owner: str = Field(..., description = "Owner of the repo")
    repo: str  = Field(..., description = "The repository name")

class RepoSummary(BaseModel):
    summary_title: str
    key_stats: List[str]
    llm_analysis: str
    github_url: str

def get_api_args(user_prompt) -> GitHubRepoQuery:
    system = (
        "You are a GitHub argument generator. "
        "Read the user query and infer the correct GitHub repository owner and repository name. "
        "If the query says Python library requests, you should return owner and repo requests. "
        "Respond using the provided Pydantic schema only."
    )

    resp = client.chat.completions.parse(
        model = openai_model,
        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": user_prompt},
        ],
        response_format = GitHubRepoQuery,
        temperature = 0.0,
    )

    args_obj = resp.choices[0].message.parsed
    if not isinstance(args_obj, GitHubRepoQuery):
        raise RuntimeError("Failed to parse API arguments")
    return args_obj

def call_github_api(repo_info_query):
    owner = repo_info_query.owner
    repo = repo_info_query.repo

    url = f"https://api.github.com/repos/{owner}/{repo}"
    headers = {
        "Accept": "application/vnd.github+json",
        "User-Agent": "openai-structured-output-demo",
    }

    try:
        r = requests.get(url, headers = headers, timeout = 15)
    except requests.RequestException as ex:
        raise SystemExit(f"Network error when calling GitHub. Details: {ex}")

    if r.status_code == 404:
        raise SystemExit(f"Repository not found at {url}")
    if r.status_code >= 400:
        raise SystemExit(f"GitHub API error {r.status_code}. Body: {r.text}")

    data = r.json()

    filtered = {
        "name": data.get("name"),
        "full_name": data.get("full_name"),
        "description": data.get("description"),
        "stargazers_count": data.get("stargazers_count"),
        "forks_count": data.get("forks_count"),
        "language": data.get("language"),
        "open_issues_count": data.get("open_issues_count"),
        "watchers_count": data.get("subscribers_count") or data.get("watchers_count"),
        "license": (data.get("license") or {}).get("spdx_id") if data.get("license") else None,
        "topics": data.get("topics", []),
        "html_url": data.get("html_url"),
        "default_branch": data.get("default_branch"),
        "archived": data.get("archived"),
        "disabled": data.get("disabled"),
        "created_at": data.get("created_at"),
        "updated_at": data.get("updated_at"),
        "pushed_at": data.get("pushed_at"),
    }

    return filtered

def get_final_summary(user_prompt, repo_data):
    system = (
        "You are an Expert Data Analyst. "
        "Write a clear and helpful summary of a GitHub repository using the provided data. "
        "The title should be short and descriptive. "
        "key_stats should be a list of punchy facts drawn from the data. "
        "llm_analysis should be a concise paragraph that explains what the project is, "
        "who might use it, and what the activity level suggests. "
        "Return only the structured fields defined by the schema."
    )

    user = {
        "original_query": user_prompt,
        "repo_data": repo_data,
    }

    resp = client.chat.completions.parse(
        model = openai_model,
        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": json.dumps(user, ensure_ascii = False)},
        ],
        response_format = RepoSummary,
        temperature = 0.2,
    )

    summary_obj = resp.choices[0].message.parsed
    if not isinstance(summary_obj, RepoSummary):
        raise RuntimeError("Failed to parse final summary")
    return summary_obj

def investigate_github_repo(user_query):
    api_args = get_api_args(user_query)
    repo_data = call_github_api(api_args)
    final_summary = get_final_summary(user_query, repo_data)

    return final_summary


# query = input("Describe the github repo you want to analyze:\n")
query = "Analyze my repo. My username is Otina12, the repo is called raft-go"
result = investigate_github_repo(query)
print(result.model_dump_json(indent = 2))

{
  "summary_title": "Raft Consensus Algorithm in Go",
  "key_stats": [
    "0 stars",
    "0 forks",
    "0 open issues",
    "Created on August 26, 2025",
    "Last updated on September 21, 2025"
  ],
  "llm_analysis": "The 'raft-go' project is an implementation of the Raft consensus algorithm written in Go. It is likely aimed at developers and engineers interested in distributed systems and consensus algorithms. The lack of stars, forks, and open issues suggests that the project is new and may still be in the early stages of development or has not yet gained traction in the community.",
  "github_url": "https://github.com/Otina12/raft-go"
}


## 📜 Scenario

You are tasked with building the backend system for a new Music Q&A Service. This service takes a natural language query from a user and must provide an accurate, synthesized answer based on factual data retrieved from the Spotify Web API.

The core difficulty is designing a system that can intelligently determine what kind of data is needed and which API call to execute before presenting a final, conversational answer.

## 🎵 Available Spotify API Endpoints

Your system is limited to using the following search capabilities. Your code must dynamically decide which type parameter to use based on the LLM's structured instruction.

| API Function | Search Type (type parameter) | Key Data Points in Response |
|--------------|------------------------------|----------------------------|
| Search for Artists | artist | ID (essential), Followers Count, Genres, Images, Popularity. |
| Search for Albums | album | ID (essential), Album Name, Release Date, Artists, Track Count. |

### API Access Requirements:

* **Base URL:** All search requests use the `/v1/search` endpoint.
* **Authentication:** Requests must include the Access Token in the header: `Authorization: Bearer <Your_Access_Token>`
* **Query Parameter:** The `q` parameter holds the search term (e.g., `q=Metallica` or `q=Master of Puppets`).

## 📈 Required System Workflow

Your application must implement a single main entry point that executes the following logical flow:

1. **Route Query:** Call the LLM using your Tool Request Schema to convert the user's query into structured API parameters.
2. **Execute API:** Use the parameters generated in Step 2 to construct and execute the correct Spotify search request (type=artist OR type=album).
3. **Retrieve Data:** Capture the complete JSON response from Spotify.
4. **Synthesize Answer:** Call the LLM a second time. Provide the original user query and the retrieved raw JSON data. Instruct the LLM to provide a final, conversational, and accurate answer to the user based only on the data you supply.

In [6]:
from typing import Literal, Optional

In [7]:
openai_api_key = os.getenv("OPENAI_API_KEY")
spotify_access_token = os.getenv("SPOTIFY_ACCESS_TOKEN")

client = OpenAI(api_key=openai_api_key)

class SpotifySearchArgs(BaseModel):
    search_type: Literal["artist", "album"] = Field(...)
    query: str = Field(...)
    reasoning: Optional[str] = Field(None)

class MusicAnswer(BaseModel):
    summary_title: str
    key_points: List[str]
    answer_text: str

def get_api_args(user_query):
    system = (
        "You are a Spotify Argument Generator. "
        "You do not know upfront which API to use. "
        "Investigate the user request and decide whether the task needs artist data or album data. "
        "Return structured arguments using the schema. "
        "Pick search_type as artist or album. "
        "Set query to the search string that Spotify would expect in the q parameter. "
        "Examples. If the user asks about Metallica the band, choose artist. "
        "If the user asks about Master of Puppets the album, choose album."
    )

    resp = client.chat.completions.parse(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user_query},
        ],
        response_format=SpotifySearchArgs,
        temperature=0.0,
    )

    args_obj = resp.choices[0].message.parsed
    if not isinstance(args_obj, SpotifySearchArgs):
        raise RuntimeError("Could not parse Spotify arguments")
    return args_obj

def call_spotify_api(query_args):
    base_url = "https://api.spotify.com/v1/search"
    headers = {
        "Authorization": f"Bearer {spotify_access_token}",
        "Accept": "application/json",
    }
    params = {
        "q": query_args.query,
        "type": query_args.search_type,
        "limit": 5,
    }

    try:
        r = requests.get(base_url, headers=headers, params=params, timeout=15)
    except requests.RequestException as ex:
        raise SystemExit(f"Network error when calling Spotify. Details: {ex}")

    if r.status_code == 401:
        raise SystemExit("Spotify says unauthorized. Check SPOTIFY_ACCESS_TOKEN")
    if r.status_code >= 400:
        raise SystemExit(f"Spotify API error {r.status_code}. Body: {r.text}")

    return r.json()


def get_final_answer(user_query, spotify_json, search_type):
    system = (
        "You are an Expert Data Analyst for music metadata. "
        "Produce a short and clear answer for the user based only on the provided Spotify JSON. "
        "Do not invent facts. "
        "If a field is missing, say that the data is not available. "
        "Keep claims grounded in the supplied JSON. "
        "Return only the fields required by the schema."
    )

    user_payload = {
        "original_query": user_query,
        "search_type": search_type,
        "spotify_raw_json": spotify_json,
        "instructions": [
            "If search_type is artist, focus on id, followers, genres, popularity, images, name, external urls.",
            "If search_type is album, focus on id, name, release date, artists, total tracks, external urls.",
            "If several items are returned, highlight the top one by Spotify popularity or the first item. Mention others briefly.",
        ],
    }

    resp = client.chat.completions.parse(
        model = openai_model,
        messages = [
            {"role": "system", "content": system},
            {"role": "user", "content": json.dumps(user_payload, ensure_ascii=False)},
        ],
        response_format = MusicAnswer,
        temperature = 0.2,
    )

    answer_obj = resp.choices[0].message.parsed
    if not isinstance(answer_obj, MusicAnswer):
        raise RuntimeError("Could not parse final answer")
    return answer_obj


def investigate_music_query(user_query):
    api_args = get_api_args(user_query)
    spotify_json = call_spotify_api(api_args)
    final_answer = get_final_answer(user_query, spotify_json, api_args.search_type)
    return final_answer


user_query = input("Enter your music question: ").strip()
if not user_query:
    user_query = "Tell me about the 'Is This It'"

result = investigate_music_query(user_query)
print(result.model_dump_json(indent=2))


{
  "summary_title": "Is This It by The Strokes",
  "key_points": [
    "Album Name: Is This It",
    "Release Date: July 30, 2001",
    "Total Tracks: 11",
    "Artist: The Strokes",
    "Spotify Link: https://open.spotify.com/album/2yNaksHgeMQM9Quse463b5"
  ],
  "answer_text": "The album \"Is This It\" by The Strokes was released on July 30, 2001. It contains 11 tracks and is available on Spotify [here](https://open.spotify.com/album/2yNaksHgeMQM9Quse463b5)."
}
