# Parallel Processing for Street View Analysis

## Overview
This notebook implements parallel processing for analyzing thousands of Street View images using LangGraph with multiple concurrent nodes.

## Problem Statement
- **Scale**: Thousands of locations × multiple orientations (horizon/ground/sky) = 10k+ images
- **Bottleneck**: Sequential AI model calls would take hours
- **Solution**: Parallel processing with bounded concurrency

## Architecture
- **Input**: CSV with Street View URLs (from `google_apis.ipynb`)
- **Processing**: LangGraph parallel nodes for vision analysis
- **Output**: Graded/analyzed results per image

## Concurrency Strategy
- **Measure**: Single call latency (L) for vision model
- **Calculate**: k ≈ T × L (where T = target throughput)
- **Start**: k=16 nodes, scale up based on provider limits
- **Monitor**: 429 errors, latency percentiles, success rates

## Key Considerations
- **Provider Limits**: API rate limits and concurrent request caps
- **Rate Limiting**: Client-side throttling to avoid 429s
- **Retry Logic**: Exponential backoff with jitter
- **Checkpointing**: Idempotent tasks to avoid duplicate billing
- **Budget Guards**: Max requests per minute/hour

## Implementation Plan
1. Load CSV and create work queue
2. Implement bounded concurrency with LangGraph
3. Add retry logic and rate limiting
4. Monitor metrics and scale accordingly

In [1]:
import sys
from pathlib import Path

sys.path.append(str(Path().absolute().parent))  # This makes the parent directory available so you can use clean absolute imports like from src.graph import ...

Idea: change the state to be a dict of key = `point_id` and value = dict containing `{horizontal: [urls] , ground : [urls], sky : [url]}`   

In [None]:
from langchain.agents import AgentState
from typing import Union
from typing_extensions import Annotated

def add_dict(left : Union[dict[str, dict[str, list[str]]], None], right : Union[dict[str, dict[str, list[str]]], None]) -> dict[str, dict[str, list[str]]]:
    """
    Reducer to combine two dictionaries. Used for streetview urls and grades.

    The streetview urls dictionary is of the form:
    {
        "point_id" : {
            "horizon" : [urls],
            "ground" : [urls],
            "sky" : [url]  # (unique url)
        }
    }

    The grades dictionary is of the form:
    {
        "point_id" : {
            "horizon" : int,
            "ground" : int,
            "sky" : int
        }
    }
    """
    if left is None:  # init left list
        left = {}
    
    if right is None:   # init right list
        right = {}
    
    return {**left, **right}  # merge the two dictionaries


class StreetViewState(AgentState):
    """State of multimodal agent, inherits from AgentState, so it gets `messages` and `remaining_steps` keys for free"""
    streetviews : Annotated[dict[str, dict[str, list[str]]], add_dict]
    results : Annotated[dict[str, dict[str, Union[int, str]]], add_dict] # will be like {point_id : {grade : int, description : str}}

Then, each agent processes a single point's views. Then we take the mean of the grades the agent gives out - by the way, we need structured outputs.   

In [None]:
from typing import List
import base64
import requests
from langchain.messages import HumanMessage


def prepare_input(state: StreetViewState) -> List[HumanMessage]:
    """
    Build one HumanMessage per **horizon image** (prototype) in the pre-assigned state.
    Each message contains: prompt text + exactly one image.
    
    Returns:
        List[HumanMessage]
    """
    messages: List[HumanMessage] = []

    text = (
        "Analyze this Street View horizon image and grade the scene from 1 to 10. "
        "Consider urban quality, cleanliness, architectural appeal, and overall visual pleasantness."
    )

    for point_id, views in state.streetviews.items():
        for url in views.get("horizon", []):
            try:
                resp = requests.get(url, timeout=10)
                resp.raise_for_status()
                img_b64 = base64.b64encode(resp.content).decode("utf-8")
                mime_type = resp.headers.get("content-type", "image/jpeg")

                content_blocks = [
                    {"type": "text", "text": text},
                    {"type": "image", "base64": img_b64, "mime_type": mime_type},
                ]
                messages.append(HumanMessage(content=content_blocks))
            except requests.RequestException as e:
                print(f"Failed to download image from {url}: {e}")
                continue

    return messages


We also want the agent to output a structured output: just the grade from 1 to 10 that he gave to the scene, maybe with `point_id` as well - but we can add the point id later manually, parsing it from state. 

In [None]:
from pydantic import BaseModel, Field

class GradeOutput(BaseModel):
    """
    Grade given to the scene
    """
    grade : int = Field(description="The grade from 1 to 10 that the agent gave to the scene")
    description : str = Field(description="A description of the scene")

# then extract with result= agent.invoke(...), result["structured_response"] will be a GradeOutput object: GradeOutput(grade=5, description="...")


In [None]:
from src.prompts import multimodal_prompt
from src.models import get_multimodal_model
from langgraph.graph import StateGraph, START, END
from langchain.agents import create_agent
import pandas as pd

multimodal_model = get_multimodal_model()
multimodal_agent = create_agent(
    model=multimodal_model,
    tools=[],
    system_prompt=multimodal_prompt,
    response_format=GradeOutput
)

async def multimodal_node(state: StreetViewState):   
    """
    Invokes the agent on the input messages. 
    For horizon images, we want to invoke one by one and then take the mean of the grades manually.
    """
    # construct multimodal input message
    multimodal_msg = prepare_input(state)  # returns HumanMessage with image

    result = await multimodal_agent.ainvoke({"messages": multimodal_msg})
    grade = result["structured_response"].grade
    description = result["structured_response"].description

    return {"results" : {state.point_id : {"grade" : grade, "description" : description}}},  

def save_results(state: StreetViewState):
    """
    Save the results to a new csv file
    """
    csv_path = Path("./streetview_samples.csv")
    
    # read the original csv
    df = pd.read_csv(csv_path)
    # add the results to the df
    df = df.merge(state.results, left_index=True, right_index=True)
    # save the df to a new csv file
    df.to_csv(csv_path, index=False)

    return f"Results saved to {csv_path}"


Using OPENAI model


In [None]:
def get_graph(checkpointer, save_display=False) -> StateGraph:
    """
    Get the builder for the graph
    """
    builder = StateGraph(StreetViewState)
    # nodes
    builder.add_node("multimodal_agent", multimodal_node)
    builder.add_node("save_results", save_results)
    builder.add_node("prepare_input", prepare_input)
    # edges
    builder.add_edge(START, "prepare_input")
    builder.add_edge("prepare_input", "multimodal_agent")
    builder.add_edge("multimodal_agent", "save_results")

    graph = builder.compile(checkpointer=checkpointer)

    if save_display:
        # save the graph display to file
        img = graph.get_graph().draw_mermaid_png() # returns bytes
        # save the bytes to file 
        with open("./graph.png", "wb") as f:
            f.write(img)
        print("Graph display saved to ./src/graph.png")

    return graph

## Main

In [None]:
csv_path = Path("./streetview_samples.csv")