In [196]:
from typing import Dict, Any, List, Union, Optional
import async_retriever as ar
import pygeoutils as geoutils
import pandas as pd
import geopandas as gpd
from pygeoogc import ServiceError, InvalidInputValue

In [202]:
class SensorThings:
    def __init__(self)-> None:
        self.base_url = "https://labs.waterdata.usgs.gov/sta/v1.1/Things"

    @staticmethod
    def odata_helper(
        columns: Optional[List[str]] = None,
        conditionals: Optional[str] = None,
        expand: Dict[str, Dict[str, str]] = None,
        max_count: Optional[int] = None,
        extra_params: Optional[Dict[str, Any]] = None,
    )-> Dict[str, str]:
        """Generate Odata filters for SensorThings API.
        
        Parameters
        ----------
        columns : list of str, optional
            Columns to be selected from the database, defaults to ``None``.
        conditionals : str, optional
            Conditionals to be applied to the database, defaults to ``None``.
            Note that the conditionals should have the form of
            ``cond1 operator 'value' and/or cond2 operator 'value``.
            For example:
            ``properties/monitoringLocationType eq 'Stream' and ...``
        expand : dict of dict, optional
            Expand the properties of the selected columns, defaults to ``None``.
            Note that the expand should have the form of
            ``{Property: {func: value, ...}}``. For example:
            ``{"Locations":
                    {
                        "select": "location",
                        "filter": "ObservedProperty/@iot.id eq '00060'",
                    },
            }``
        max_count : int, optional
            Maximum number of items to be returned, defaults to ``None``.
        extra_params : dict, optional
            Extra parameters to be added to the Odata filter, defaults to ``None``.
        
        Returns
        -------
        odata : dict
            Odata filter for the SensorThings API.
        """
        odata: Dict[str, str] = {}
        if columns is not None:
            odata["select"] = ",".join(columns)

        if conditionals is not None:
            odata["filter"] = conditionals

        def _odata(kwds: Dict[str, str])-> str:
            return ";".join(f"${k}={v}" for k, v in kwds.items())

        if expand is not None:
            odata["expand"] = ",".join(
                f"{func}({_odata(od)})"
                for func, od in expand.items()
            )

        if max_count is not None:
            odata["top"] = max_count

        if extra_params is not None:
            odata.update(extra_params)
        return odata

    def query_byodata(self, odata: str, format: str = "json")-> Union[gpd.GeoDataFrame, pd.DataFrame]:
        """Query the SensorThings API by Odata filter.
        
        Parameters
        ----------
        odata : str
            Odata filter for the SensorThings API.
        format : str, optional
            Format of the response, defaults to ``json``.
            Valid values are ``json`` and ``geojson``.
        
        Returns
        -------
        pandas.DataFrame or geopandas.GeoDataFrame
            Requested data.
        """
        valid_formats = ["json", "geojson"]
        if format not in valid_formats:
            raise InvalidInputValue("format", valid_formats)

        kwds = odata.copy()
        if format == "geojson":
            kwds.update({"resultFormat": "GeoJSON"})

        kwds = {"params": {f"${k}" : v for k, v in kwds.items()}}
        resp = ar.retrieve_json([self.base_url], [kwds])[0]

        if "message" in resp:
            raise ServiceError(resp["message"])

        if format == "json":
            data = resp["value"]
            while '@iot.nextLink' in resp:
                resp = ar.retrieve_json([resp['@iot.nextLink']])[0]
                data.extend(resp["value"])
            return self._to_df(data)
        return self._to_geodf(resp)

    @staticmethod
    def _to_geodf(response: Dict[str, Any])-> gpd.GeoDataFrame:
        """Convert the response to a GeoDataFrame."""
        return geoutils.json2geodf(response)

    @staticmethod
    def _to_df(response: Dict[str, Any])-> pd.DataFrame:
        """Convert the response to a DataFrame."""
        return pd.json_normalize(response)

In [203]:
sensor = SensorThings()

In [204]:
odata = {
    "filter": "properties/monitoringLocationType eq 'Stream' and properties/stateFIPS eq 'US:04'",
}
df = sensor.query_byodata(odata)
df

Unnamed: 0,description,@iot.id,name,@iot.selfLink,TaskingCapabilities@iot.navigationLink,Locations@iot.navigationLink,HistoricalLocations@iot.navigationLink,Datastreams@iot.navigationLink,MultiDatastreams@iot.navigationLink,properties.state,...,properties.districtCode,properties.altitudeDatum,properties.altitudeMethod,properties.hydrologicUnit,properties.altitudeAccuracy,properties.monitoringLocationUrl,properties.monitoringLocationName,properties.monitoringLocationType,properties.monitoringLocationNumber,properties.monitoringLocationAltitudeLandSurface
0,Stream,USGS-09497700,USGS-09497700,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,North American Vertical Datum of 1988,Interpolated from topographic map.,150601030102,20,https://waterdata.usgs.gov/monitoring-location...,"CIBECUE CREEK NEAR OVERGAARD, AZ",Stream,09497700,7200
1,Stream,USGS-09472050,USGS-09472050,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,National Geodetic Vertical Datum of 1929,Interpolated from topographic map.,150502030503,10,https://waterdata.usgs.gov/monitoring-location...,"SAN PEDRO R AT REDINGTON BRIDGE NR REDINGTON, AZ",Stream,09472050,2820.
2,Stream,USGS-09424900,USGS-09424900,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,National Geodetic Vertical Datum of 1929,Interpolated from topographic map.,150302030506,20,https://waterdata.usgs.gov/monitoring-location...,"SANTA MARIA RIVER NEAR BAGDAD, AZ",Stream,09424900,1360
3,Stream,USGS-09537200,USGS-09537200,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,National Geodetic Vertical Datum of 1929,Interpolated from topographic map.,150803010307,20,https://waterdata.usgs.gov/monitoring-location...,"LESLIE CREEK NEAR MCNEAL, AZ.",Stream,09537200,4620.
4,Stream,USGS-09503700,USGS-09503700,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,National Geodetic Vertical Datum of 1929,Interpolated from topographic map.,150602020401,20,https://waterdata.usgs.gov/monitoring-location...,"VERDE RIVER NEAR PAULDEN, AZ",Stream,09503700,4117.
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
176,Stream,USGS-09485450,USGS-09485450,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,National Geodetic Vertical Datum of 1929,Level or other surveyed method.,150503020206,.1,https://waterdata.usgs.gov/monitoring-location...,"PANTANO WASH AT BROADWAY BLVD. AT TUCSON, AZ.",Stream,09485450,2568.83
177,Stream,USGS-09403850,USGS-09403850,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,National Geodetic Vertical Datum of 1929,Interpolated from topographic map.,150100031004,20,https://waterdata.usgs.gov/monitoring-location...,"KANAB CREEK ABOVE THE MOUTH NEAR SUPAI, AZ",Stream,09403850,1920.
178,Stream,USGS-09379025,USGS-09379025,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,North American Vertical Datum of 1988,Interpolated from topographic map.,140802040801,10,https://waterdata.usgs.gov/monitoring-location...,"CHINLE CREEK AT CHINLE, AZ",Stream,09379025,5550.
179,Stream,USGS-09426630,USGS-09426630,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,https://labs.waterdata.usgs.gov/sta/v1.1/Thing...,Arizona,...,04,North American Vertical Datum of 1988,Interpolated from Digital Elevation Model,150302040410,1.6,https://waterdata.usgs.gov/monitoring-location...,"BILL WILLIAMS RIVER AT LAKE HAVASU, ABV HWY-95...",Stream,09426630,467


In [205]:
expand = {"Locations": {"select": "location"}}
max_count = 1000
conditionals = "properties/monitoringLocationType eq 'Stream' and properties/stateFIPS eq 'US:04'"
odata = sensor.odata_helper(expand=expand, max_count=max_count, conditionals=conditionals)

df = sensor.query_byodata(odata, format="geojson")
df.explore()

In [206]:
expand = {
    "ObservedProperty": {"select": "name,description,@iot.id"},
    "Observations": {
            "select": "result,phenomenonTime,@iot.id",
            "orderby": "phenomenonTime desc",
            "top": 1,
    },
}
odata_inner = sensor.odata_helper(expand=expand)

columns = ["properties", "@iot.id"]
conditionals = " and ".join(
    [
        "Datastreams/ObservedProperty/@iot.id eq '00060' ",
        "properties/monitoringLocationType eq 'Stream' ",
        "startswith(properties/hydrologicUnit,'15')",
    ]
)
expand = {
    "Locations": {"select": "name,description,location,@iot.id"},
    "Datastreams": {
            "select": "name,unitOfMeasurement,@iot.id",
            "filter": "ObservedProperty/@iot.id eq '00060'",
            "expand": odata_inner["expand"]
    },
}
max_count = 1000
odata = sensor.odata_helper(
    columns=columns,
    conditionals=conditionals,
    expand=expand,
    max_count=max_count,
)

df = sensor.query_byodata(odata, format="geojson")
df.explore()