# Tutorial 2: Use your Custom Geospatial Image to Run Large-scale Geospatial Processing Routines using an Amazon SageMaker Processing Job

__Prerequisites:__ We recommend running this notebook on an `m5.xlarge`. When launching the JupyterLab Space in SageMaker Studio, make sure to select the custom geospatial image that was created and attached to the SageMaker domain in the previous steps.

In [None]:
#geo libraries
import geopandas as gpd
import pandas as pd
import pystac_client
import shapely
from shapely import geometry, wkt
import leafmap
import rasterio
from rasterio.plot import show
import rioxarray
#other libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
import json
import uuid
from datetime import datetime
import boto3
import sagemaker
from IPython.display import JSON

In [None]:
#instantiate sessions and clients
session = boto3.Session()
sagemaker_session = sagemaker.Session()
execution_role = sagemaker.get_execution_role()
s3_client = boto3.client('s3')

___

__Define an exemplary Geometry__

In [None]:
coords = [[-102.00723310488662,40.596123257503024],[-102.00723310488662,40.58168585757733],[-101.9882214495914,40.58168585757733],[-101.9882214495914,40.596123257503024],[-102.00723310488662,40.596123257503024]]
polgyon = shapely.Polygon(coords)
aoi_gdf = gpd.GeoDataFrame(index=[0], crs='epsg:4326', geometry=[polgyon])

__Visualize the Geometry__

In [None]:
Map = leafmap.Map(center=[40.596123257503024, -102.00723310488662], zoom=13)
Map.add_basemap("USGS NAIP Imagery")
Map.add_gdf(aoi_gdf, layer_name="test", style={"color": "yellow", "fillOpacity": 0.3, "clickable": True,})
Map

__Search Sentinel-2 Satellite Data for given Geometry__

In [None]:
def search_sentinel2_collection(start_date, end_date, aoi_geometry,max_cloud=100):
    """
    Search Sentinel 2 data collection for target_date
    and collect results including meta data in a dictionary.
    This function uses the PySTAC client
    """
    client = pystac_client.Client.open("https://earth-search.aws.element84.com/v1")
    collection = "sentinel-2-l2a"
    
    search = client.search(
        collections=[collection],
        query = {"eo:cloud_cover":{"lt":max_cloud}},
        intersects=aoi_geometry.to_crs("EPSG:4326").geometry[0].__geo_interface__, 
        datetime=f"{start_date}/{end_date}"
    )
    
    s2_items = []
    for item in search.items_as_dicts():
        s2_items.append(item)
        
    return s2_items

In [None]:
#search parameters
# let's load satellite data for the AOI
analysis_start_date=pd.to_datetime("2023-01-01T00:00:00Z").date()
analysis_end_date=pd.to_datetime("2023-12-31T23:59:59Z").date()
aoi_geometry = aoi_gdf.geometry

In [None]:
sentinel2_items = search_sentinel2_collection(
    start_date=analysis_start_date,
    end_date=analysis_end_date,
    aoi_geometry = aoi_geometry,
)

In [None]:
print("Total Sentinel-2 items found:",len(sentinel2_items))

In [None]:
JSON(sentinel2_items)

__Create the Input Files for the Processing Job__

We will create 2 types of files here:
1. One file per selected Sentinel-2 scene with all metadata. Seperating scenes into individual files makes sharding for distributed training easy using the `ShardedByS3Key` distribution strategy.
2. A single file with the geometry of the AOI of interest

In [None]:
def generate_execution_id():
    current_date = datetime.now()
    return current_date.strftime('%Y%m%d_%H%M%S') + "_" + str(uuid.uuid4()).split("-")[0]

execution_id = generate_execution_id()
execution_id

In [None]:
# set bucket names
bucket_name = sagemaker_session.default_bucket()
bucket_prefix_sentinel2_meta = f"processing/geospatial-data-cube/{execution_id}/input/sentinel2_scenes"
bucket_prefix_aoi_meta = f"processing/geospatial-data-cube/{execution_id}/input/aoi"

__Generate and upload Sentinel-2 Tile Metadata Files to S3__

In [None]:
#let's review tile ids and their prevalence
s2_search_results = pd.DataFrame(sentinel2_items)
s2_search_results["tile_id"] = s2_search_results["id"].apply(lambda x: x.split("_")[1])
s2_search_results.groupby(["tile_id"]).count()["id"]

In [None]:
#limit to one tile ID only
tile_id="13TGE"
manifest_file = [i for i in sentinel2_items if tile_id in i["id"]]
print("Scenes in scope:",len(manifest_file))

In [None]:
#save files to S3
for s in manifest_file:
    file_name = "{}_metadata.json".format(s["id"])
    response = s3_client.put_object(Body=(bytes(json.dumps(s, default=str).encode('UTF-8'))),
                                    Bucket=bucket_name, Key=f"{bucket_prefix_sentinel2_meta}/{file_name}")

__Generate and upload the AOI Geometry Metadata File to S3__

In [None]:
# upload aoi metadata to S3
aoi_dict = {"name":"us_field","coords":coords,"crs":'epsg:4326'}
file_name_aoi = "aoi_metadata.json"
response = s3_client.put_object(Body=(bytes(json.dumps(aoi_dict, default=str).encode('UTF-8'))),
                                Bucket=bucket_name, Key=f"{bucket_prefix_aoi_meta}/{file_name_aoi}")

__Define the Processing Routine__

Here we will generate a routine that generates a data cube of different bands across the 3 years of observation for the AOI in scope.
Specifically, for each Sentinel-2 scene will:
1. Load the required bands
2. Perform re-sampling to 10m resolution where required (for bands lower than 10m resolution)
3. Merge all bands into a single xarray
4. Intersect the xarray with a given geometry
5. Filter out scenes with cloud cover above a certain threshold within the AOI (not the S2 tile)
6. Save the clipped xarray data cube as a NetCDF file

In [None]:
os.makedirs("./scripts/",exist_ok=True)

In [None]:
%%writefile scripts/generate_aoi_data_cube.py
import os
import pickle
import sys
import subprocess
import warnings
import json
import time
import geopandas
import pandas as pd
import numpy as np
import geopandas as gpd
import shapely
from shapely.geometry import shape
from shapely.geometry import Polygon
import xarray as xr
import rioxarray
from rioxarray.exceptions import NoDataInBounds
import gc
import logging
import datetime

#Parameters
MAX_CLOUD_COVER_AOI = 0.3 #maximum 30% cloud coverage WTHIN area of interest (not the full tile)

#HELPERS
def get_logger(log_level):    
    logger = logging.getLogger("processing")

    console_handler = logging.StreamHandler(sys.stdout)
    # include %(name)s to also include logger name
    console_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
    console_handler.setLevel(log_level)

    logger.addHandler(console_handler)
    logger.setLevel(log_level)
    return logger

def s2_scene_id_to_cog_path(scene_id):
    parts = scene_id.split("_")
    s2_qualifier = "{}/{}/{}/{}/{}/{}".format(
        parts[1][0:2],
        parts[1][2],
        parts[1][3:5],
        parts[2][0:4],
        str(int(parts[2][4:6])),
        "_".join(parts)
    )
    return f"https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/{s2_qualifier}/"

def scene_id_to_datetime(scene_id):
    dt = pd.to_datetime(scene_id.split("_")[-3])
    return dt

def get_aoi_cloud_free_ratio(SCL_raster, aoi_gdf):
    #reproject to EPSG:4326
    kwargs = {"nodata": np.nan}
    SCL_raster = SCL_raster.rio.reproject("EPSG:4326", **kwargs)
    #clip to AOI
    SCL_raster_clipped = SCL.rio.clip(aoi_gdf.geometry.values, aoi_gdf.crs, drop=False, invert=True)
    #get cloud-free ratio
    SCL_mask_pixel_count = SCL_raster_clipped.SCL.data.size - np.count_nonzero(np.isnan(SCL_raster_clipped.SCL.data)) # get size of SCL mask in num pixels (excl. any nans)
    SCL_classes_cloud_free = [4,5,6] # see here: https://custom-scripts.sentinel-hub.com/custom-scripts/sentinel-2/scene-classification/
    SCL_cloud_free_pixel_count = np.isin(SCL_raster_clipped.SCL.data,SCL_classes_cloud_free).sum() #count pixels that are non-cloud class
    cloud_free_ratio = SCL_cloud_free_pixel_count/SCL_mask_pixel_count
    
    return cloud_free_ratio

#MAIN
if __name__ == "__main__":
    logger = get_logger(logging.DEBUG) #INFO
    
    logger.info("Starting processing")
    logger.debug(f"Argument List: {str(sys.argv)}")

    #set permissions on output path
    output_path = "/opt/ml/processing/output/"
    subprocess.check_call(["sudo","chown","-R","sagemaker-user", output_path])
    
    #load geometry and construct gdf
    aoi_path = '/opt/ml/processing/input/aoi_meta/aoi_metadata.json'
    with open(aoi_path) as file:
        aoi_metadata = json.load(file)
    polgyon = Polygon(aoi_metadata["coords"])
    aoi_gdf = gpd.GeoDataFrame(index=[0], crs=aoi_metadata["crs"], geometry=[polgyon])
                
    #load all the sentinel-2 metadata files
    s2_data_path = '/opt/ml/processing/input/sentinel2_meta/'
    s2_items = []
    for current_path, sub_dirs, files in os.walk(s2_data_path):
        for file in files:
            if file.endswith(".json"):
                full_file_path = os.path.join(s2_data_path, current_path, file)
                with open(full_file_path, 'r') as f:
                    s2_items.append(json.load(f))
    
    item_count_total = len(s2_items)
    item_count_current = 0
    elapsed_time_batch = 0
    logger.info("Received {} scenes to process".format(item_count_total))

    for item in s2_items:        
        if item_count_current > 0 and item_count_current % 5 == 0:
            logger.info("Processed {}/{} scenes ({}s per scene)".format(
                item_count_current, 
                item_count_total,
                round(elapsed_time_batch / item_count_current, 2)
            ))
        item_count_current += 1

        start = time.time()
        s2_scene_id = item["id"]
        logger.debug("Processing scene: {}".format(s2_scene_id))
        
        s2_cog_prefix = s2_scene_id_to_cog_path(s2_scene_id)
        grid_id = s2_scene_id.split("_")[1]
        #time/date
        date = scene_id_to_datetime(s2_scene_id)
        #10m bands
        blue_band_url = f"{s2_cog_prefix}/B02.tif"
        green_band_url = f"{s2_cog_prefix}/B03.tif"
        red_band_url = f"{s2_cog_prefix}/B04.tif"
        nir1_band_url = f"{s2_cog_prefix}/B08.tif"
        #20m bands
        nir2_band_url = f"{s2_cog_prefix}/B8A.tif"
        swir1_band_url = f"{s2_cog_prefix}/B11.tif"
        swir2_band_url = f"{s2_cog_prefix}/B12.tif"
        scl_mask_url = f"{s2_cog_prefix}/SCL.tif"
        
        #read from S3
        #10m bands
        B02 = rioxarray.open_rasterio(blue_band_url, masked=True,band_as_variable=True)
        B02 = B02.rename(name_dict={"band_1":"B02"})
        B03 = rioxarray.open_rasterio(green_band_url, masked=True,band_as_variable=True)
        B03 = B03.rename(name_dict={"band_1":"B03"})
        B04 = rioxarray.open_rasterio(red_band_url, masked=True,band_as_variable=True)
        B04 = B04.rename(name_dict={"band_1":"B04"})
        B08 = rioxarray.open_rasterio(nir1_band_url, masked=True,band_as_variable=True)
        B08 = B08.rename(name_dict={"band_1":"B08"})
        #20m bands/masks
        B8A = rioxarray.open_rasterio(nir2_band_url, masked=True,band_as_variable=True)
        B8A = B8A.rename(name_dict={"band_1":"B8A"})
        B11 = rioxarray.open_rasterio(swir1_band_url, masked=True,band_as_variable=True)
        B11 = B11.rename(name_dict={"band_1":"B11"})
        B12 = rioxarray.open_rasterio(swir2_band_url, masked=True,band_as_variable=True)
        B12 = B12.rename(name_dict={"band_1":"B12"})
        SCL = rioxarray.open_rasterio(scl_mask_url, masked=True,band_as_variable=True)
        SCL = SCL.rename(name_dict={"band_1":"SCL"})
        
        #resample to 10m where needed (relatively compute intensive!)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            B8A = B8A.interp(x=B02["x"], y=B02["y"])
            B11 = B11.interp(x=B02["x"], y=B02["y"])
            B12 = B12.interp(x=B02["x"], y=B02["y"])
            SCL = SCL.interp(x=B02["x"], y=B02["y"])

        #merge bands
        band_arrays = [B02,B03,B04,B08,B8A,B11,B12,SCL]
        s2_cube=xr.merge(objects=band_arrays)
        del B02
        del B03
        del B04
        del B08
        del B8A
        del B11
        del B12
        del band_arrays
        gc.collect()
        
        #assign time dimension
        s2_cube = s2_cube.assign_coords(time=date) #call this 'time'
        #reproject to EPSG:4326
        kwargs = {"nodata": np.nan}
        s2_cube = s2_cube.rio.reproject("EPSG:4326", **kwargs) 
        
        #check cloud-free ratio at aoi level
        cloud_free_ratio = get_aoi_cloud_free_ratio(SCL_raster=SCL, aoi_gdf=aoi_gdf)
        if (1-float(cloud_free_ratio)) > MAX_CLOUD_COVER_AOI: #skip if too cloudy for given geom
            logger.debug(f"AOI cloud cover ratio too high ({round(1-cloud_free_ratio,3)}), skipping scene {s2_scene_id}...")
            del cloud_free_ratio
        else: #continue if not too cloudy for given geom
            logger.debug(f"AOI cloud cover ratio below threshold ({round(1-cloud_free_ratio,3)}), processing scene {s2_scene_id}...")     
            try:
                clipped = s2_cube.rio.clip(aoi_gdf.geometry.values, aoi_gdf.crs)
            except NoDataInBounds as e:
                logger.warn("Skipping {}: no data in bounds".format(s2_scene_id))
                continue
                
            #save to file
            file_name = "{}-{}.nc".format(aoi_metadata["name"],s2_scene_id)
            output_file_path = f"{output_path}{file_name}"

            clipped.to_netcdf(output_file_path)

            logger.debug(f"Written output:{output_file_path}")

            del clipped
            del cloud_free_ratio
            gc.collect()
            
        # explicit dereference to keep memory usage low
        del s2_cube
        del SCL
        gc.collect()
        
        elapsed_time = time.time() - start
        elapsed_time_batch += elapsed_time
        
        logger.debug("Processed scene {}: {}s".format(s2_scene_id, elapsed_time))

__Execute Processing Job__

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = sagemaker.Session().boto_region_name
role = get_execution_role()

geospatial_image_uri = "<GEOSPATIAL-IMAGE-URI>" #<-- set to uri of the geospatial image you have created

processor_ndvi_clip = ScriptProcessor(
    command=['python3'],
    image_uri=geospatial_image_uri,
    role=role,
    instance_count=20,
    instance_type='ml.m5.2xlarge',
    base_job_name='aoi-data-cube'
)

processor_ndvi_clip.run(
    code='scripts/generate_aoi_data_cube.py',
    inputs=[
        ProcessingInput(
            source=f"s3://{bucket_name}/{bucket_prefix_aoi_meta}/",
            destination='/opt/ml/processing/input/aoi_meta/',
            s3_data_distribution_type="FullyReplicated"
        ),        
        ProcessingInput(
            source=f"s3://{bucket_name}/{bucket_prefix_sentinel2_meta}/",
            destination='/opt/ml/processing/input/sentinel2_meta/',
            s3_data_distribution_type="ShardedByS3Key"
        ),
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output/',
            destination=f"s3://{bucket_name}/processing/geospatial-data-cube/{execution_id}/output/"
        )
    ]
)

In [None]:
job_descriptor = processor_ndvi_clip.jobs[-1].describe()
job_duration = job_descriptor['ProcessingEndTime'] - job_descriptor['ProcessingStartTime']

print("Processing job output located at '{}'".format(job_descriptor["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]))
print("Processing job '{}' took {} to complete".format(job_descriptor['ProcessingJobName'], job_duration))

__Load processed Scenes from S3__

In [None]:
s3 = boto3.client('s3')
s3_output_uri = job_descriptor["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
output_bucket_name = s3_output_uri.split("/")[2]
output_bucket_prefix = "/".join(s3_output_uri.split("/")[3:])
response = s3.list_objects_v2(Bucket=output_bucket_name, Prefix=output_bucket_prefix)

In [None]:
# download each object to local
local_directory="data"
for obj in response.get('Contents', []):
    key = obj['Key']
    local_path = os.path.join(local_directory, key)

    # Create local directories if they don't exist
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    # Download the object
    s3.download_file(output_bucket_name, key, local_path)

__Combine Scenes into Data Cube with Dimensions (time, x, y)__

In [None]:
#get individual time-stamped scenes
import xarray as xr
files = os.listdir(f"{local_directory}/{output_bucket_prefix}")
scenes=[]
for f in files:
    scene = xr.open_dataset(f"{local_directory}/{output_bucket_prefix}{f}", decode_coords="all")
    scenes.append(scene)

#generate cube
s2_cube=xr.concat(objs=scenes, coords="minimal", dim="time",join='outer')
s2_cube = s2_cube.sortby("time")

In [None]:
s2_cube

__Perform some explorative Plotting__

In [None]:
#get earliest observation and plot band B02
s2_cube.B02.sel(time=s2_cube.time.min()).plot()

In [None]:
#get latest observation and plot band B8A
s2_cube.B8A.sel(time=s2_cube.time.max()).plot()

__Remove Pixels with Clouds__

In [None]:
s2_cube_clean = s2_cube.where(s2_cube.SCL.isin([4,5,7])) # only retain non-cloudy pixels using Sentinel scene classification: https://custom-scripts.sentinel-hub.com/custom-scripts/sentinel-2/scene-classification/

__Compute Spectral Indices like Normalized Difference Vegetation Index (NDVI) and Normalized Burn Ratio (NBR)__

In [None]:
s2_cube_clean['NDVI'] = (s2_cube_clean.B08 - s2_cube_clean.B04) / (s2_cube_clean.B08 + s2_cube_clean.B04)
s2_cube_clean['NBR'] = (s2_cube_clean.B08 - s2_cube_clean.B12) / (s2_cube_clean.B08 + s2_cube_clean.B12) #NBR = (B8 - B12) / (B8 + B12)

In [None]:
s2_cube_clean

__Perform Pixel-level Temporal Analyses__

In [None]:
#plot ndvi timeseries for midpoint of field
midpoint_x=s2_cube_clean["x"][round((len(s2_cube_clean["x"]))/2)]
midpoint_y=s2_cube_clean["y"][round((len(s2_cube_clean["y"]))/2)]
#plt.scs2_cube_clean.NDVI.sel(x=midpoint_x,y=midpoint_y).plot()
plt.scatter(x=s2_cube_clean.time,y=s2_cube_clean.NDVI.sel(x=midpoint_x,y=midpoint_y))

__End of Notebook__