## step 1: building the pipeline

In [55]:
import os
import requests
import json
from superpipe.steps import LLMStructuredStep, CustomStep, SERPEnrichmentStep
from superpipe import models
from pydantic import BaseModel, Field

# Check if the SERPER API key is working correctly
api_key = os.getenv("SERPER_API_KEY")
headers = {"Authorization": f"Bearer {api_key}"}

query = {"q": "test search"}
response = requests.get("https://api.serper.dev/search", headers=headers, params=query)

if response.status_code == 200:
    print("API key is working correctly.")
else:
    print(f"Error: API request failed with status code {response.status_code}")
    # You might want to exit the script or raise an exception here,
    # as the pipeline steps won't work without a valid API key.
    # For example, you can use:
    # exit(1)
    # or
    # raise RuntimeError(f"API request failed with status code {response.status_code}")

# Step 1: use Superpipe's built-in SERP enrichment step to search for the person's Wikipedia page
# Include a unique "name" for the step that will be used to reference this step's output in future steps

# Step 1: use Superpipe's built-in SERP enrichment step to search for the persons wikipedia page
# Include a unique "name" for the step that will used to reference this step's output in future steps

search_step = SERPEnrichmentStep(
  prompt= lambda row: f"{row['name']} wikipedia",
  name="search"
)

# Step 2: Use an LLM to extract the wikipedia URL from the search results
# First, define a Pydantic model that specifies the structured output we want from the LLM

class ParseSearchResult(BaseModel):
  wikipedia_url: str = Field(description="The URL of the Wikipedia page for the person")

# Then we use the built-in LLMStructuredStep and specify a model and a prompt
# The prompt is a function that has access to all the fields in the input as well as the outputs of previous steps

parse_search_step = LLMStructuredStep(
  model=models.gpt35,
  prompt= lambda row: f"Extract the Wikipedia URL for {row['name']} from the following search results: \n\n {row['search']}",
  out_schema=ParseSearchResult,
  name="parse_search"
)


In [56]:
import os
import requests
from dotenv import load_dotenv
from superpipe.steps import LLMStructuredStep, CustomStep, SERPEnrichmentStep
from superpipe import models
from pydantic import BaseModel, Field

# Load environment variables from .env file
load_dotenv()

# Retrieve serper.dev API key from environment variable
SERPER_API_KEY = os.getenv('SERPER_API_KEY')

# Step 1: use Superpipe's built-in SERP enrichment step to search for the person's Wikipedia page
# Include a unique "name" for the step that will be used to reference this step's output in future steps

# Define a function to perform SERP enrichment with custom headers
def custom_serp_enrichment(row):
    url = "https://api.serper.dev/serp"
    headers = {'x-api-key': SERPER_API_KEY}
    params = {'q': f"{row['name']} wikipedia"}
    response = requests.get(url, headers=headers, params=params)
    return response.json()

search_step = SERPEnrichmentStep(
    prompt=custom_serp_enrichment,
    name="search"
)
print("Search Output:", search_output)


# Step 2: Use an LLM to extract the Wikipedia URL from the search results
# First, define a Pydantic model that specifies the structured output we want from the LLM

class ParseSearchResult(BaseModel):
    wikipedia_url: str = Field(description="The URL of the Wikipedia page for the person")

parse_search_step = LLMStructuredStep(
    model=models.gpt35,
    prompt=lambda row: f"Extract the Wikipedia URL for {row['name']} from the following search results: \n\n {row['search']}",
    out_schema=ParseSearchResult,
    name="parse_search"
)

# Step 3: Fetch Wikipedia content using the URL obtained from the search results

fetch_wikipedia_step = CustomStep(
    transform=lambda row: requests.get(row['wikipedia_url']).text,
    name="fetch_wikipedia"
)

# Step 4: Extract relevant data from the Wikipedia content

class ExtractedData(BaseModel):
    date_of_birth: str = Field(description="The date of birth of the person in the format YYYY-MM-DD")
    alive: bool = Field(description="Whether the person is still alive")
    cause_of_death: str = Field(description="The cause of death of the person. If the person is alive, return 'N/A'")

extract_step = LLMStructuredStep(
    model=models.gpt4,
    prompt=lambda row: f"""Extract the date of birth for {row['name']}, whether they're still alive \
    and if not, their cause of death from the following Wikipedia content: \n\n {row['wikipedia']}""",
    out_schema=ExtractedData,
    name="extract_data"
)

# Define and run the pipeline

pipeline = Pipeline([
    search_step,
    parse_search_step,
    fetch_wikipedia_step,
    extract_step
])

# Run the pipeline for a specific person
output = pipeline.run({"name": "Jean-Paul Sartre"})
print(json.dumps(output, indent=2))

Search Output: {'name': 'Jean-Paul Sartre', 'search': '{"message":"Unauthorized. Sign up for a free account.","statusCode":403}'}


KeyError: 'wikipedia_url'

## step 2: evaluating the pipeline [TBD}]

broken into the folliowing parts:

1. **a dataset with labels** - in this case we need a list of famous people and the true date of birth, living status and cause of death of each person
2. **evaluation function** - a function that defines what "correct" is. We'll use simple comparison for date of birth and living status, and an LLM call to evaluate the correctness of cause of death

In [None]:
import pandas as pd

data = [
  ("Ruth Bader Ginsburg", "1933-03-15", False, "Pancreatic cancer"),
  ("Bill Gates", "1955-10-28", True, "N/A"),
  ("Steph Curry", "1988-03-14", True, "N/A"),
  ("Scott Belsky", "1980-04-18", True, "N/A"),
  ("Steve Jobs", "1955-02-24", False, "Pancreatic tumor/cancer"),
  ("Paris Hilton", "1981-02-17", True, "N/A"),
  ("Kurt Vonnegut", "1922-11-11", False, "Brain injuries"),
  ("Snoop Dogg", "1971-10-20", True, "N/A"),
  ("Kobe Bryant", "1978-08-23", False, "Helicopter crash"),
  ("Aaron Swartz", "1986-11-08", False, "Suicide")
]
df = pd.DataFrame([{"name": d[0], "dob_label": d[1], "alive_label": d[2], "cause_label": d[3]} for d in data])

class EvalResult(BaseModel):
  result: bool = Field(description="Is the answer correct or not?")

cause_evaluator = LLMStructuredStep(
  model=models.gpt4,
  prompt=lambda row: f"This is the correct cause of death: {row['cause_label']}. Is this provided cause of death accurate? The phrasing might be slightly different. Use your judgement: \n{row['cause_of_death']}",
  out_schema=EvalResult,
  name="cause_evaluator")

def eval_fn(row):
  score = 0
  if row['date_of_birth'] == row['dob_label']:
    score += 0.25
  if row['alive'] == row['alive_label']:
    score += 0.25
  if row['cause_label'] == "N/A":
    if row['cause_of_death'] == "N/A":
      score += 0.5
  elif cause_evaluator.run(row)['result']:
    score += 0.5  
  return score

pipeline.run(df)
print("Score: ", pipeline.evaluate(eval_fn))
df

## step 3: optimizing the pipeline

this pipeline has an accuracy score of 100%, but perhaps there's room for improvement on cost and speed. First let's view the cost and latency of each step to figure out which one is the bottleneck.

In [None]:
for step in pipeline.steps:
  print(f"Step {step.name}:")
  print(f"- Latency: {step.statistics.total_latency}")
  print(f"- Cost: {step.statistics.input_cost + step.statistics.output_cost}")

Clearly the final step (`extract_data`) is the one responsible for the bulk of the cost and latency. This makes sense, because we're feeding in the entire wikipedia article to GPT-4, one of the most expensive models.

Let's find out if we can get away with a cheaper/faster model. Most models cannot handle the number of tokens needed to ingest a whole wikipedia article, so we'll turn to the two that can that are also cheaper than GPT4: Claude 3 Sonnet and Claude 3 Haiku.

In [None]:
from superpipe.grid_search import GridSearch
from superpipe.models import claude3_haiku, claude3_sonnet
from superpipe.steps import LLMStructuredCompositeStep

# we need to use LLMStructuredCompositeStep which uses GPT3.5 for structured JSON extraction
# because Claude does not support JSON mode or function calling out of the box
new_extract_step = LLMStructuredCompositeStep(
  model=models.claude3_haiku,
  prompt=extract_step.prompt,
  out_schema=ExtractedData,
  name="extract_data_new"
)

new_pipeline = Pipeline([
  search_step,
  parse_search_step,
  fetch_wikipedia_step,
  new_extract_step
], evaluation_fn=eval_fn)

param_grid = {
  new_extract_step.name:{
    "model": [claude3_haiku, claude3_sonnet]}
}
grid_search = GridSearch(new_pipeline, param_grid)
grid_search.run(df)

Strangely, Claude 3 Haiku is both more accurate (100% v/s 45%) as well as cheaper and faster. This is suprising, but useful information that we wouldn't have found out unless we built and evaluated pipelines on _our specific data_ rather than benchmark data.

In [None]:
best_params = grid_search.best_params
new_pipeline.update_params(best_params)
new_pipeline.run(df)
print("Score: ", new_pipeline.score)
for step in new_pipeline.steps:
  print(f"Step {step.name}:")
  print(f"- Latency: {step.statistics.total_latency}")
  print(f"- Cost: {step.statistics.input_cost + step.statistics.output_cost}")