Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ SMALL_MODEL_NAME="openai:gpt-4.1-mini"

GEODINI_API="https://geodini.k8s.labs.ds.io"

STAC_CATALOG_NAME="planetarycomputer"
STAC_CATALOG_URL="https://planetarycomputer.microsoft.com/api/stac/v1"
74 changes: 59 additions & 15 deletions frontend/streamlit_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,57 @@
"""
)

# Create input field for the query
query = st.text_input(
"Enter your query",
placeholder="Find imagery over Paris from 2017",
help="Describe what kind of satellite imagery you're looking for",
)
# Create two columns for query and catalog URL
col1, col2 = st.columns([3, 1])

# Add a search button
search_button = st.button("Search")
with col1:
# Create input field for the query
query = st.text_input(
"Enter your query",
placeholder="Find imagery over Paris from 2017",
help="Describe what kind of satellite imagery you're looking for",
)
# Add a search button
search_button = st.button("Search")

with col2:
# Define catalog options
catalog_options = {
"Planetary Computer": "https://planetarycomputer.microsoft.com/api/stac/v1",
"VEDA": "https://openveda.cloud/api/stac",
"E84 Earth Search": "https://earth-search.aws.element84.com/v1",
"DevSeed EOAPI.dev": "https://stac.eoapi.dev",
"Custom URL": "custom",
}

# Function to run the search asynchronously
async def run_search(query):
response = requests.post(
f"{API_URL}/items/search", json={"query": query, "limit": 10}
# Create dropdown for catalog selection
selected_catalog = st.selectbox(
"Select STAC Catalog",
options=list(catalog_options.keys()),
index=0, # Default to Planetary Computer
help="Choose a predefined STAC catalog or select 'Custom URL' to enter your own.",
)

# Handle custom URL input
if selected_catalog == "Custom URL":
catalog_url = st.text_input(
"Enter Custom Catalog URL",
placeholder="https://your-catalog.com/stac/v1",
help="Enter the URL of your custom STAC catalog.",
)
else:
catalog_url = catalog_options[selected_catalog]
# Show the selected URL as read-only info
st.info(f"Using: {catalog_url}")


# Function to run the search asynchronously
async def run_search(query, catalog_url=None):
payload = {"query": query, "limit": 10}
if catalog_url:
payload["catalog_url"] = catalog_url.strip()

response = requests.post(f"{API_URL}/items/search", json=payload)
return response.json()["results"]


Expand All @@ -60,7 +95,7 @@ async def run_search(query):
# Run the async search
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(run_search(query))
results = loop.run_until_complete(run_search(query, catalog_url))
items = results["items"]
aoi = results["aoi"]
explanation = results["explanation"]
Expand Down Expand Up @@ -212,10 +247,19 @@ async def run_search(query):
"""
Search for satellite imagery using natural language.

**Examples queries:**
**Available STAC Catalogs:**
- **Planetary Computer**: Microsoft's global dataset catalog
- **VEDA**: NASA's Earth science data catalog
- **E84 Earth Search**: Element 84's STAC catalog for Earth observation data on AWS Open Data
- **DevSeed EOAPI.dev**: DevSeed's example STAC catalog
- **Custom URL**: Enter any STAC-compliant catalog URL

The system will automatically index new catalogs on first use.

**Example queries:**
- imagery of Paris from 2017
- Cloud-free satellite data of Georgia the country from 2022
- relatively cloud-free images in 2024 that have RGB visual bands over Longmont, Colorado that can be downloaded via HTTP
- relatively cloud-free images in 2024 over Longmont, Colorado
- images in 2024 over Odisha with cloud cover between 50 to 60%
- NAIP imagery over the state of Washington
- Burn scar imagery of from 2024 over the state of California
Expand Down
1 change: 0 additions & 1 deletion helm-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ The init container uses the same STAC catalog configuration as the API:
api:
env:
STAC_CATALOG_URL: "https://planetarycomputer.microsoft.com/api/stac/v1"
STAC_CATALOG_NAME: "planetarycomputer"

initContainer:
enabled: true # Set to false to disable data pre-loading
Expand Down
1 change: 0 additions & 1 deletion helm-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ api:
PYTHONUNBUFFERED: "1"
HF_HOME: "/app/data/.cache/huggingface"
GEODINI_API: "https://geodini.k8s.labs.ds.io"
STAC_CATALOG_NAME: "planetarycomputer"
STAC_CATALOG_URL: "https://planetarycomputer.microsoft.com/api/stac/v1"
DEFAULT_TARGET_COLLECTIONS: "['landsat-8-c2-l2', 'sentinel-2-l2a']"

Expand Down
30 changes: 17 additions & 13 deletions stac_search/agents/collections_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
from pprint import pformat
from typing import List, Dict, Any

import chromadb
from pydantic_ai import Agent
from sentence_transformers import SentenceTransformer
from stac_search.catalog_manager import CatalogManager


logger = logging.getLogger(__name__)
Expand All @@ -20,7 +19,6 @@
MODEL_NAME = "all-MiniLM-L6-v2"
DATA_PATH = os.environ.get("DATA_PATH", "data/chromadb")

STAC_CATALOG_NAME = os.getenv("STAC_CATALOG_NAME", "planetarycomputer")
STAC_COLLECTIONS_URL = os.getenv(
"STAC_COLLECTIONS_URL", "https://planetarycomputer.microsoft.com/api/stac/v1"
)
Expand Down Expand Up @@ -65,7 +63,7 @@ async def collection_search(
top_k: int = 5,
model_name: str = MODEL_NAME,
data_path: str = DATA_PATH,
stac_catalog_name: str = STAC_CATALOG_NAME,
catalog_url: str = None,
) -> List[CollectionWithExplanation]:
"""
Search for collections and rerank results with explanations
Expand All @@ -75,25 +73,31 @@ async def collection_search(
top_k: Maximum number of results to return
model_name: Name of the sentence transformer model to use
data_path: Path to the vector database
stac_catalog_name: Name of the STAC catalog
stac_collections_url: URL of the STAC collections API
catalog_url: URL of the STAC catalog

Returns:
Ranked results with relevance explanations
"""
start_time = time.time()

# Initialize model and database connections
model = SentenceTransformer(model_name)
# Initialize catalog manager
catalog_manager = CatalogManager(data_path=data_path, model_name=model_name)

# If catalog_url is provided, ensure it's loaded
if catalog_url:
load_result = await catalog_manager.load_catalog(catalog_url)
if not load_result["success"]:
logger.error(f"Failed to load catalog: {load_result['error']}")
raise ValueError(f"Failed to load catalog: {load_result['error']}")

# Get the appropriate collection
collection = catalog_manager.get_catalog_collection(catalog_url)

load_model_time = time.time()
logger.info(f"Model loading time: {load_model_time - start_time:.4f} seconds")

client = chromadb.PersistentClient(path=data_path)
collection_name = f"{stac_catalog_name}_collections"
collection = client.get_collection(name=collection_name)

# Generate query embedding
query_embedding = model.encode([query])
query_embedding = catalog_manager.model.encode([query])

# Search vector database
results = collection.query(
Expand Down
41 changes: 31 additions & 10 deletions stac_search/agents/items_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
@dataclass
class Context:
query: str
catalog_url: str | None = None
location: str | None = None
top_k: int = 5
return_search_params_only: bool = False
Expand Down Expand Up @@ -105,12 +106,16 @@ class CollectionSearchResult:
collections: List[CollectionWithExplanation]


async def search_collections(query: str) -> CollectionSearchResult | None:
async def search_collections(
query: str, catalog_url: str = None
) -> CollectionSearchResult | None:
logger.info("Searching for relevant collections ...")
collection_query = await collection_query_framing_agent.run(query)
logger.info(f"Framed collection query: {collection_query.data.query}")
if collection_query.data.is_specific:
collections = await collection_search(collection_query.data.query)
collections = await collection_search(
collection_query.data.query, catalog_url=catalog_url
)
return CollectionSearchResult(collections=collections)
else:
return None
Expand Down Expand Up @@ -278,24 +283,42 @@ async def item_search(ctx: Context) -> ItemSearchResult:
results = await search_items_agent.run(
f"Find items for the query: {ctx.query}", deps=ctx
)
catalog_url_to_use = ctx.catalog_url or STAC_CATALOG_URL

# determine the collections to search
target_collections = await search_collections(ctx.query) or []
target_collections = await search_collections(ctx.query, catalog_url_to_use) or []
logger.info(f"Target collections: {pformat(target_collections)}")
default_target_collections = DEFAULT_TARGET_COLLECTIONS

if not target_collections:
# If no specific collections were found, use the default target collections
default_target_collections = DEFAULT_TARGET_COLLECTIONS
# check that default_target_collections exist in the catalog
all_collection_ids = [
collection.id
for collection in Client.open(catalog_url_to_use).get_collections()
]
default_target_collections = [
collection_id
for collection_id in default_target_collections
if collection_id in all_collection_ids
]

if target_collections:
explanation = "Considering the following collections:"
for result in target_collections.collections:
explanation += f"\n- {result.collection_id}: {result.explanation}"
collections_to_search = [
collection.collection_id for collection in target_collections.collections
]
else:
elif default_target_collections:
explanation = f"Including the following common collections in the search: {', '.join(default_target_collections)}\n"
collections_to_search = default_target_collections
else:
explanation = "Searching all collections in the catalog."
collections_to_search = all_collection_ids

# Actually perform the search
client = Client.open(STAC_CATALOG_URL)
client = Client.open(catalog_url_to_use)
params = {
"max_items": 20,
"collections": collections_to_search,
Expand All @@ -310,11 +333,9 @@ async def item_search(ctx: Context) -> ItemSearchResult:
logger.info(f"Found polygon for {results.data.location}")
params["intersects"] = polygon
else:
explanation += f"\n\n No polygon found for {results.data.location}. "
return ItemSearchResult(
items=None,
search_params=params,
aoi=None,
explanation=f"No polygon found for {results.data.location}",
items=None, search_params=params, aoi=None, explanation=explanation
)

if ctx.return_search_params_only:
Expand Down
29 changes: 21 additions & 8 deletions stac_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
FastAPI server for STAC Natural Query
"""

from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uvicorn

from stac_search.agents.collections_search import collection_search
Expand All @@ -30,29 +31,41 @@
# Define request model
class QueryRequest(BaseModel):
query: str
catalog_url: Optional[str] = None


class STACItemsRequest(BaseModel):
query: str
catalog_url: Optional[str] = None
return_search_params_only: bool = False


# Define search endpoint
@app.post("/search")
async def search(request: QueryRequest):
"""Search for STAC collections using natural language"""
results = collection_search(request.query)
return {"results": results}
try:
results = await collection_search(
request.query, catalog_url=request.catalog_url
)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@app.post("/items/search")
async def search_items(request: STACItemsRequest):
"""Search for STAC items using natural language"""
ctx = ItemSearchContext(
query=request.query, return_search_params_only=request.return_search_params_only
)
results = await item_search(ctx)
return {"results": results}
try:
ctx = ItemSearchContext(
query=request.query,
catalog_url=request.catalog_url,
return_search_params_only=request.return_search_params_only,
)
results = await item_search(ctx)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


def start_server(host: str = "0.0.0.0", port: int = 8000):
Expand Down
Loading