# Deploying a Vessel Detection Pipeline with Batch Compute and atalog Events
__________________
#### _Objective:_
Demonstrate how the Platform can enable low-latency, event-driven image analysis to be deployed easily at scale. Here we deploy a sample near-real-time pipeline which analyzes open-access [Sentinel-1 backscatter data](https://www.mdpi.com/2072-4292/14/15/3524) to identify vessel locations. This pipeline will be deployed over [Kaohsiung Port](https://kh.twport.com.tw/en/), the largest in [Taiwan and 15th largest in the world](https://www.worldshipping.org/top-50-ports), is located in a subtropical region often obscured by optical data.

#### _What will we cover?_
* Searching [`Catalog`](https://docs.descarteslabs.com/descarteslabs/catalog/readme.html) to retrieve imagery, masking, thresholding, and vectorizing the vertical polarized band (**vv**) from [Sentinel-1](https://sentiwiki.copernicus.eu/web/s1-mission) to highlight vessels on open water
* Creating a  [`Function`](https://docs.descarteslabs.com/descarteslabs/compute/readme.html#descarteslabs.compute.Function) which will respond to each new image upload, vectorize detected vessels, and write the results to a [`Table`](https://docs.descarteslabs.com/descarteslabs/vector/readme.html)
* Defining an [`EventSubscription`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/event_subscription.html#descarteslabs.catalog.EventSubscription) which will invoke the function on each new image upload over our corresponding input geometry

In [None]:
import descarteslabs as dl
from descarteslabs.catalog import (
    EventSubscription,
    EventSubscriptionComputeTarget,
    EventType,
    Image,
    Placeholder,
    Product, 
    properties as p
)
from descarteslabs.compute import Function
from descarteslabs.vector import Table

In [None]:
from rasterio.features import shapes
from rasterio.transform import Affine
from shapely.geometry import shape

In [None]:
import json
import sys
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt

_**Note:** For brevity, these two helper functions are imported. For reference, please see [utils.py](utils.py)_

In [None]:
from utils import create_table, create_product

Setting global variables:

In [None]:
# For Batch Compute Function
major = sys.version_info.major
minor = sys.version_info.minor

In [None]:
auth = dl.auth.Auth.get_default_auth()
org = auth.payload['org']
user_hash = auth.namespace

Two input IDs are set:
* A surrogate Sentinel-1 [`Product`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/product.html#descarteslabs.catalog.Product) ID, as we will write a new Image to this product later on in the notebook
* An empty Vector [`Table`](https://docs.descarteslabs.com/descarteslabs/vector/readme.html#descarteslabs.vector.Table) ID to write results to

In [None]:
pid = f"{org or user_hash}:sample-sigma0v-product:{user_hash}"
tid = f"vessel-detections-demo-table:{user_hash}"

_**Note:** The two helper functions we imported earlier simply check for pre-existing products, delete if necessary, and create new products to act as real-time inputs._

In [None]:
tid = create_table(tid)
pid = create_product(pid, 'Demo SAR Data')

Reading in our input AOI as a [geopandas `GeoDataFrame`](https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.html) and creating an [`AOI`](https://docs.descarteslabs.com/descarteslabs/geo/readme.html#descarteslabs.geo.AOI) object:

In [None]:
gdf = gpd.read_file("Kaohsiung.geojson")
aoi = dl.geo.AOI(gdf.iloc[0]['geometry'], resolution=10., crs='EPSG:3857')

## Methodology
Below, we can iterate on the methodology we want to deploy:
* Search Sentinel-1 Imagery
* Mask to a global water mask
* Threshold the **vv** band to identify vessels in open water
* Vectorize the thresholded results

Searching our surrogate Sentinel-1 product for imagery over the first date:

In [None]:
prod = Product.get(pid)
ic = (prod
      .images()
      .intersects(aoi)
      .filter("2025-02-03"<p.acquired<"2025-02-05")
      .collect()
     )
ic

Retrieving a [global water mask product:](https://www.nature.com/articles/nature20584)

In [None]:
water_prod = Product.get("jrc:global-surface-water:v0")
water_ic = (water_prod
            .images()
            .intersects(aoi)
            .filter("2020-01-10"<p.acquired<"2025-01-12")
            .collect()
           )

Rasterizing the **vv** band as a [numpy `ndarray`](https://numpy.org/doc/2.1/reference/generated/numpy.ndarray.html):

In [None]:
vv, info = ic[0].ndarray(["vv"], geocontext=aoi, raster_info=True)
vv.shape

Retrieving the water mask and masking the **vv** band:

In [None]:
water_ndarr = water_ic.mosaic(['extent'], data_type='UInt16')
vv[water_ndarr==0]=0
vv[vv.mask]=0

Plotting the results:

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(20,10))
ax[0].imshow(vv[0])
ax[0].set_title("VV Band masked to Water Extent")
ax[1].imshow(vv[0]>2000) # This number may vary by AOI
ax[1].set_title("VV Thresholded")

Here, we vectorize these thresholded arrays as a list of [shapely `Polygon`s](https://shapely.readthedocs.io/en/2.0.6/reference/shapely.Polygon.html):

In [None]:
trans = Affine.from_gdal(*info['geoTransform'])
polys = list(
    shapes(vv, mask=(vv > 2000), transform=trans))
poly_list = [shape(poly[0]) for poly in polys]

And can visualize these as a geodataframe:

In [None]:
vector_gdf = gpd.GeoDataFrame(
    {"geometry": poly_list},
    crs=aoi.crs, )
vector_gdf = gpd.GeoDataFrame(
    {'geometry':[vector_gdf.union_all()]
    },
    crs=aoi.crs
).explode()

In [None]:
fig, ax = plt.subplots(figsize=(10,10))
vector_gdf.plot(ax=ax, color='red')
vector_gdf[vector_gdf.area<100000].plot(ax=ax, color='blue')

## Scaling with Batch Compute
Here we define a local function to send to our compute service which:
* Accepts an [`Image`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/image.html#descarteslabs.catalog.Image) ID as the input argument
* Pulls down the image data and applies the vessel detection methodology
* Writes the output geodataframe as input [`Feature`](https://docs.descarteslabs.com/descarteslabs/vector/readme.html#descarteslabs.vector.Feature)`s to a table

In [None]:
def vessel_detector(img_id, write_to_vector=True):
    import descarteslabs as dl
    from descarteslabs.catalog import Image, Product, properties as p
    from descarteslabs.vector import Table
    import numpy as np
    import geopandas as gpd
    from rasterio.features import shapes
    from rasterio.transform import Affine
    from shapely.geometry import shape
    from shapely import wkt
    print(f"Processing {img_id}")
    
    auth = dl.auth.Auth.get_default_auth()
    org = auth.payload['org']
    user_hash = auth.namespace
    # Note this is hard coded in! 
    vessel_table = Table.get(f"{org or user_hash}:vessel-detections-demo-table:{user_hash}")

    img = Image.get(img_id)
    
    img_aoi = img.geocontext

    water_prod = Product.get("jrc:global-surface-water:v0")
    water_ic = (water_prod
                .images()
                .intersects(img_aoi)
                .filter("2020-01-01"<p.acquired<"2025-01-01")
                .collect()
               )
    
    vv, info = img.ndarray(["vv"], raster_info=True)
    water_ndarr = water_ic.mosaic('extent', data_type='UInt16')
    print(f"Pulled data of shape {vv.shape}")
    
    vv[water_ndarr==0]=0
    vv[vv.mask]=0
    print("Masked to water")
    ### Insert more advanced methodology here! 
    trans = Affine.from_gdal(*info['geoTransform'])
    polys = list(
        shapes(vv, mask=(vv > 2000), transform=trans)
    )
    poly_list = [shape(poly[0]) for poly in polys]
    print(f"Vectorized")
    
    vector_gdf = gpd.GeoDataFrame(
        {"geometry": poly_list},
        crs=img_aoi.crs
    )
    vector_gdf = gpd.GeoDataFrame(
        {'geometry':[vector_gdf.union_all()]},
        crs=img_aoi.crs
    ).explode().to_crs(4326)
    
    vector_gdf['DATE']=img.acquired.strftime("%Y-%m-%d")
    vector_gdf['SOURCE_IMG_ID']=img.id
    
    # Quick way to remove large bogus geoms
    out_gdf=vector_gdf[vector_gdf.to_crs(3857).area<100000] 
    print(f"Simplified to {len(out_gdf)} rows")
    if write_to_vector:
        out_gdf = vessel_table.add(out_gdf)
    print("Added Table")
    return out_gdf.to_json()

Next, we'll test the methodology out locally (without writing the output rows to the table):

In [None]:
gpd.GeoDataFrame.from_features(json.loads(vessel_detector(ic[0].id, write_to_vector=False)).plot()

And lastly submit our compute function alongside several scaling parameters, such as:
* Number of **CPUs**
* **Memory** allocated to each job
* **Max Concurrency** of running jobs
* **Timeout** if errors occur

_Note, we could also pass any pip requirements or local utilities alongside this function_

In [None]:
async_func = Function(
    vessel_detector,
    name="Vessel Detector Automated Pipeline",
    image=f"python{major}.{minor}:latest",
    cpus=0.25,
    memory=512,
    maximum_concurrency=20,
    timeout=300,
    retry_count=0,
)
async_func.save()
print(f"Created: {async_func.id}")

Here, we'll submit our first [`Job`](https://docs.descarteslabs.com/descarteslabs/compute/readme.html#descarteslabs.compute.Job) to get things started:

In [None]:
job1 = async_func(ic[0].id)
job1

### Tracking Running Functions
Now, it is advised to navigate to [app.descarteslabs.com/compute](https://app.descarteslabs.com/compute) to track your function's build progress and any active running jobs. We will refer to this page for the remainder of the notebook. 

## Event Listening 
Now that a function is defined with predefined inputs, we can set up an [`EventSubscription`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/event_subscription.html#descarteslabs.catalog.EventSubscription) which will listen to our product for any new imagery which satisfies various user-configurable filter conditions. For more details on Events, please visit [Catalog Guides 07 Working with Events](../).

First, we'll clear any old subscriptions that may have the same name (if you've run this notebook in the past!):

In [None]:
for subscription in EventSubscription.search().filter(p.name=="vessel_detection_susbcription").collect():
    print(f"Deleting {subscription}")
    subscription.delete()

Next, we'll create our subscription alongisde several important input parameters:
* [`EventType`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/types.html#descarteslabs.catalog.EventType), which here is Image Upload, but can be others such as Scheduled or modification.
* [Event `Namespace`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/event_subscription.html#descarteslabs.catalog.EventSubscription.namespace), which corresponds to the namespace over which to "listen" for new events
* Geometry over which to filter events
* [`EventSubscriptionTarget`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/types.html#descarteslabs.catalog.EventSubscriptionTarget), which here is a [`EventSusbcriptionComputeTarget`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/types.html#descarteslabs.catalog.EventSubscriptionComputeTarget), but can be other destinations such as [`EventSubscriptionSqsTarget`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/types.html#descarteslabs.catalog.EventSubscriptionSqsTarget) for AWS SQS
    * This target also takes a [`Placeholder`](https://docs.descarteslabs.com/descarteslabs/catalog/docs/types.html#descarteslabs.catalog.Placeholder), which will serve as the variable for the input image ID

In [None]:
subscription = EventSubscription(
    name="vessel_detection_susbcription",
    event_type=[EventType.NEW_IMAGE],
    event_namespace=[pid],
    geometry = gdf.iloc[0]['geometry'].buffer(0.5),
    targets=[
        EventSubscriptionComputeTarget(
            async_func.id, 
            Placeholder("event.detail.id")
        )
    ]
)
subscription.save()
subscription

## Deploying the Event Listener
Now that the event subscription is saved, we can test it by adding new imagery to the surrogate Sentinel-1 product.

Let's test it out by adding a few new images:

In [None]:
img2 = Image(product=prod, id=f"{prod.id}:image2")
img2.acquired = "2025-02-15"
upload = img2.upload("data/s1_sample_2.tif", overwrite=True)
upload.wait_for_completion()
upload.status

In [None]:
img3 = Image(product=prod, id=f"{prod.id}:image3")
img3.acquired = "2025-02-27"
upload = img3.upload("data/s1_sample_3.tif", overwrite=True)
upload.wait_for_completion()
upload.status

Note that these will now show up as new jobs in the [Compute Monitor](https://app.descarteslabs.com/compute), or we could check them programmatically:

In [None]:
print(len(async_func.jobs.collect()))

_Note this cell will wait for all jobs in the function to complete running)_

In [None]:
async_func.wait_for_completion()

Once complete, we can collect the resulting table's data as a geodataframe for export:

In [None]:
res_gdf = Table.get(tid).collect()
res_gdf.plot(column='DATE', figsize=(10,10))

In [None]:
res_gdf.to_file('results.geojson')

### Cleaning Up
_Is always best practice!_

In [None]:
async_func.delete_jobs(delete_results=True)
async_func.delete()

In [None]:
subscription.delete()