# Run Large-scale Geospatial Processing Routines using an Amazon SageMaker Processing Job

__Prerequisites:__ We recommend running this notebook on an `m5.xlarge`. Running this notebook requires several geospatial libraries. You can either pip-install these, or set up a custom geospatial Kernel (see [this](https://aws.amazon.com/blogs/machine-learning/create-custom-images-for-geospatial-analysis-with-amazon-sagemaker-distribution-in-amazon-sagemaker-studio/) blog post for details). 

You also need to have built and registered two custom Docker images on the Amazon Elastic Container Registry (ECR) and have set appropriate permission on this Notebook's execution role for it to access the image for each of the processing jobs contained that form the pipeline. Please see the `/prerequisites` folder for the Docker files and notebooks to build and register the Docker images.

This pipeline has four steps:
1. __Step 1 - Pre-process Satellite Tile__ divide tiles into nxn pixel-sized chips, save each chip as a netcdf datacube and a rgb .png thumbnail along with metatdata as .parquet. The metatdata files contains all geospatial information for each chip as well as the file paths. 
2. __Step 2: Generate Embeddings__ generate patch and class embeddings using an off-the-shelve geospatial embedding model
4. __Step 3 - Process Embeddings:__ run dimensionality reduction and similarity computations on the embeddings
3. __Step 4 - Consolidate Metadata and Load into Vector DB:__ run dimensionality reduction and similarity computations on the embeddings

In [None]:
!pip install -q geopandas pystac_client leafmap

In [None]:
#geo libraries
import geopandas as gpd
import pandas as pd
import pystac_client
import leafmap
import leafmap.foliumap as leafmap
#other libraries
import os
import pandas as pd
import json
import boto3
from botocore.exceptions import ClientError
import warnings
warnings.filterwarnings('ignore') # Ignore all warnings
#sagemaker
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterInteger)

In [None]:
#instantiate sessions and clients
session = boto3.Session()
sagemaker_session = sagemaker.Session()
execution_role = sagemaker.get_execution_role()
s3_client = boto3.client('s3')
region = session.region_name

# Get the account number
sts_client = boto3.client('sts')
account_number = sts_client.get_caller_identity()["Account"]

# instantiate data bucket, create if it does not exist
env_name= "dev" #<-- align with settings in CDK stack 
bucket_name = f"aws-geofm-data-bucket-{account_number}-{region}-{env_name}"
print(bucket_name)

try:
    s3_client.head_bucket(Bucket=bucket_name)
except ClientError:
    s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': region
            }
        )
    s3_client.put_public_access_block(
        Bucket=bucket_name,
        PublicAccessBlockConfiguration={
            'BlockPublicAcls': True,
            'IgnorePublicAcls': True,
            'BlockPublicPolicy': True,
            'RestrictPublicBuckets': True
        }
    )

#custom images
geo_processing_cpu_img = f"{account_number}.dkr.ecr.{region}.amazonaws.com/geo-img-cpu" #<--- verify URI of the geo processing image
geofm_embedding_gpu_img = f"{account_number}.dkr.ecr.{region}.amazonaws.com/clay-gpu-container-new" #<--- verify URI of the geofm embedding generation image

#create folder for processing scripts
os.makedirs("./scripts/", exist_ok=True)

___

## Set up Manifest Files

In [None]:
aoi_name="brazil-mato-grosso" #<-- pick an descriptive name for your area of observation, do not use underscores!

__Define an exemplary Geometry__

You can use geojson.io to define a bounding box, then paste it below.

In [None]:
# brazil-mato-grosso Bounding Box
geo={
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "properties": {},
      "geometry": {
        "coordinates": [[[-61.092623344875769, -10.253101623520225], 
                          [-61.092623344875769, -10.90655037748293], 
                          [-61.757376655124226, -10.90655037748293], 
                          [-61.757376655124226, -10.253101623520225], 
                          [-61.092623344875769, -10.253101623520225]]],
        "type": "Polygon"
      }
    }
  ]
}

In [None]:
# Convert to GeoDataFrame
aoi_gdf = gpd.GeoDataFrame.from_features(geo['features'])
aoi_gdf.crs='epsg:4326'
# Check the geometry type and CRS
print("\nGeometry Type:", aoi_gdf.geometry.geom_type)
print("Coordinate Reference System (CRS):", aoi_gdf.crs)


__Visualize the Geometry__

In [None]:
Map = leafmap.Map(center=aoi_gdf.geometry.centroid[0].coords[0][::-1], zoom=10)
Map.add_gdf(aoi_gdf, layer_name="test", style={"color": "yellow", "fillOpacity": 0.3, "clickable": True,})
Map

__Search Sentinel-2 Satellite Data for given Geometry__

In [None]:
def search_sentinel2_collection(start_date, end_date, aoi_geometry,max_cloud=100):
    """
    Search Sentinel 2 data collection for target_date
    and collect results including meta data in a dictionary.
    This function uses the PySTAC client
    """
    client = pystac_client.Client.open("https://earth-search.aws.element84.com/v1")
    collection = "sentinel-2-l2a"
    
    search = client.search(
        collections=[collection],
        query = {"eo:cloud_cover":{"lt":max_cloud}},
        intersects=aoi_geometry.to_crs("EPSG:4326").geometry[0].__geo_interface__, 
        datetime=f"{start_date}/{end_date}"
    )
    
    s2_items = []
    for item in search.items_as_dicts():
        s2_items.append(item)
        
    return s2_items

In [None]:
#search parameters
analysis_start_date=pd.to_datetime("2023-01-01T00:00:00Z").date()
analysis_end_date=pd.to_datetime("2025-02-28T23:59:59Z").date()
aoi_geometry = aoi_gdf.geometry

In [None]:
sentinel2_items = search_sentinel2_collection(
    start_date=analysis_start_date,
    end_date=analysis_end_date,
    aoi_geometry = aoi_geometry,
    max_cloud=15
)

In [None]:
print("Total Sentinel-2 items found:", len(sentinel2_items))

__Create the Input Files for the Processing Job__

We will create 2 types of files here:
1. One file per selected Sentinel-2 scene with all metadata. Seperating scenes into individual files makes sharding for distributed training easy using the `ShardedByS3Key` distribution strategy.
2. A single file with the geometry of the MGRS grid in scope

In [None]:
bucket_prefix_sentinel2_meta = f"processing/{aoi_name}/input/sentinel2_scenes"
bucket_prefix_aoi = f"processing/{aoi_name}/input/aoi"

__Generate and upload Sentinel-2 Tile Metadata Files to S3__

In [None]:
#let's review tile ids and their prevalence
s2_search_results = pd.DataFrame(sentinel2_items)
s2_search_results["tile_id"] = s2_search_results["id"].apply(lambda x: x.split("_")[1])
s2_search_results.groupby(["tile_id"]).count()["id"]

In [None]:
#limit to one tile ID only
tile_id="20LPP" # Use one tile_id from previous cell

manifest_file = [i for i in sentinel2_items if tile_id in i["id"]]
#limit to one processing level only
processing_level="_0_"
manifest_file = [i for i in manifest_file if processing_level in i["id"]]
print("Scenes in scope:",len(manifest_file))

In [None]:
#save manifest files to S3
for s in manifest_file:
    file_name = "{}_metadata.json".format(s["id"])
    response = s3_client.put_object(Body=(bytes(json.dumps(s, default=str).encode('UTF-8'))),
                                    Bucket=bucket_name, Key=f"{bucket_prefix_sentinel2_meta}/{file_name}")

In [None]:
#save aoi geojson to S3
s3_destination_aoi = f"s3://{bucket_name}/{bucket_prefix_aoi}/aoi.geojson"
aoi_gdf.to_file(s3_destination_aoi,driver='GeoJSON')

## Step 1: Pre-Process Satellite Imagery

- Divides satellite tiles into n×n pixel chips
- Generates NetCDF datacubes (`.netcdf`) and PNG thumbnails (`.png` ) per each chip
- Creates metadata in Parquet format (`.parquet`)
- Distribution strategy: by Sentinel-2 tile id (e.g., `S2A_20LPP_20220606_0_L2A`)

##### Input structure

    processing/
    └── <aoi_name>/
        └── input/
        ├── sentinel2_scenes/
            └── <s2_scene_id>_metadata.json
            └── <s2_scene_id>_metadata.json
            └── ...

##### Output structure

    output/
    └── <aoi_name>/
        ├── meta/
        │   ├── <s2_scene_id>_chip_meta.parquet
        │   └── ...
        ├── raw-chips/
        │   └── <s2_product_level>/
        │       └── <MGRS_grid>/
        │           └── <year>/
        │               └── <month>/
        │                   └── <s2_scene_id>_<chip_size>_chipped_<x_dim>_<y_dim>.nc
        └── rgb-imgs/
        └── <s2_product_level>/
                └── <MGRS_grid>/
                    └── <year>/
                    └── <month>/
                            └── <s2_scene_id>_<chip_size>_rgb_thumbnail_<x_dim>_<y_dim>.png

__Define Processing Job and Pipeline Step__

In [None]:
#arguments
s3_destination_chips = f"s3://{bucket_name}/output/{aoi_name}/raw-chips/"
s3_destination_imgs = f"s3://{bucket_name}/output/{aoi_name}/rgb-imgs/"
s3_destination_meta = f"s3://{bucket_name}/output/{aoi_name}/meta/"
s3_destination_embeddings = f"s3://{bucket_name}/output/{aoi_name}/embeddings/"
s3_destination_chip_ids = f"s3://{bucket_name}/output/{aoi_name}/unique-chip-ids/{tile_id}/"
s3_destination_embeddings_processed = f"s3://{bucket_name}/output/{aoi_name}/processed-embeddings/{tile_id}/"
s3_destination_consolidated_output = f's3://{bucket_name}/output/{aoi_name}/consolidated-output/{tile_id}/'
s3_destination_lance_db = f's3://{bucket_name}/output/{aoi_name}/lance-db/'

In [None]:
bucket_name_param = ParameterString(name="BucketName", default_value=str(bucket_name))
env_name_param = ParameterString(name="EnvName", default_value=str(env_name))
s2_processing_lvl_param = ParameterString(name="S2ProcessingLevel", default_value="l2a")
aoi_name_param = ParameterString(name="AoiName", default_value=str(aoi_name))
s2_grid_id_param = ParameterString(name="GridId", default_value=str(tile_id))
chip_size_param = ParameterString(name="ChipSize", default_value=str(256))
geo_proc_image_uri_param = ParameterString(name="GeoProcImageURI", default_value=str(geo_processing_cpu_img))
geofm_emb_gen_image_uri_param = ParameterString(name="GeoFMEmbeddingImageURI", default_value=str(geofm_embedding_gpu_img))
raw_processing_instance_count_param = ParameterInteger(name="RawProcessingInstanceCount",default_value=10)
emb_generation_instance_count_param = ParameterInteger(name="EmbeddingGenerationInstanceCount",default_value=5)
emb_processing_instance_count_param = ParameterInteger(name="EmbeddingProcessingInstanceCount",default_value=10)

In [None]:
role = get_execution_role()
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="30d"  # Cache expires after 30 days
)

In [None]:
preprocess_s2_tile = ScriptProcessor(
    command=['python3'],
    image_uri=geo_proc_image_uri_param, # geo_processing_cpu_img,
    role=role,
    instance_count=raw_processing_instance_count_param,
    instance_type='ml.m5.4xlarge',
    volume_size_in_gb=50
)

In [None]:
pipeline_step_1 = ProcessingStep(
    name="GeoPreprocessingStep",
    processor=preprocess_s2_tile,
    inputs=[       
        ProcessingInput(
            source=f"s3://{bucket_name}/{bucket_prefix_sentinel2_meta}/",
            destination='/opt/ml/processing/input/sentinel2_meta/',
            s3_data_distribution_type="ShardedByS3Key"
        ),
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output/chips/',
            destination=s3_destination_chips
        ),
        ProcessingOutput(
            source='/opt/ml/processing/output/imgs/',
            destination=s3_destination_imgs
        ),
        ProcessingOutput(
            source='/opt/ml/processing/output/meta/',
            destination=s3_destination_meta
        )
    ],
    job_arguments=[
        "--S2_PROCESSING_LEVEL", s2_processing_lvl_param,
        "--AOI_NAME", aoi_name_param,
        "--CHIP_SIZE", chip_size_param,
        "--S3_DESTINATION_PATH_CHIPS", s3_destination_chips,
        "--S3_DESTINATION_PATH_IMGS", s3_destination_imgs,
        "--S3_DESTINATION_PATH_META", s3_destination_meta],
    code='scripts/preprocess_s2_tile.py',
    cache_config=cache_config
)

## Step 2: Generate Embeddings

- Generates patch and class embeddings per each chip
- Uses a pre-trained geospatial embedding models (Clay_v1) hosted on HuggingFace
- Stores embeddings as NumPy arrays (`.npy`)
- Tracks file paths of generated embedding vectors by updating the metadata `.parquet` on S3
- Generates a simple text file (`.txt`) per each unique step. These files are used for distribution in later pipeline steps.
- Distribution strategy: by Sentinel-2 tile id (e.g., `S2A_20LPP_20220606_0_L2A`)

##### Output structure


      output/
      └── <aoi_name>/
         ├── meta/
         │   └── <s2_scene_id>_chip_meta.parquet
         ├── embeddings/
         │   └── <s2_product_level>/
         │       └── <MGRS_grid>/
         │           └── <year>/
         │               └── <month>/
         │                   ├── <s2_scene_id>_<chip_size>_chip_cls_embeddings_<bands>_<x_dim>_<y_dim>.npy
         │                   └── <s2_scene_id>_<chip_size>_chip_patch_embeddings_<bands>_<x_dim>_<y_dim>.npy
         └── unique-chip-ids/
            └── <MGRS_grid>/
                  └── <x_dim>_<y_dim>.txt


In [None]:
batch_size_param = ParameterString(name="BatchSize", default_value=str(32))
s2_bands_param = ParameterString(name="S2Bands", default_value=str("red_green_blue_nir")) #bands to use for emebedding generation; ,nir08,rededge1,rededge2,rededge3,swir16,swir22

In [None]:
role = get_execution_role()

embedding_gen_processor = ScriptProcessor(
    command=['python3'],
    image_uri=geofm_emb_gen_image_uri_param,
    role=role,
    instance_count=emb_generation_instance_count_param,
    instance_type='ml.g5.xlarge',
    volume_size_in_gb=50
)

In [None]:
pipeline_step_2 = ProcessingStep(
    name="EmbeddingGenerationStep",
    processor=embedding_gen_processor,
    inputs=[
         ProcessingInput( 
             source=s3_destination_meta,
             destination='/opt/ml/processing/input/meta',
             s3_data_distribution_type="ShardedByS3Key",
         )
    ],
    outputs=[
            ProcessingOutput(
                source='/opt/ml/processing/output/meta',
                destination=s3_destination_meta
            ),
            ProcessingOutput(
                        source='/opt/ml/processing/output/chip_ids',
                        destination=s3_destination_chip_ids
                    )],
    job_arguments=['--BUCKET_NAME', bucket_name_param, 
               '--AOI_NAME',aoi_name_param,
               '--CHIP_SIZE',chip_size_param,
               '--BATCH_SIZE',batch_size_param,
               '--S2_BANDS',s2_bands_param,                     
               '--MAX_CLOUD_COVER_PERC','1.0', #retain, even if there are clouds! 
              ],
    code='scripts/generate_embeddings.py',
    depends_on=[pipeline_step_1],
    cache_config=cache_config
)

## Step 3: Process Embedding Vectors

- Runs dimensionality reduction on the patch or class embedding vector (if distributed processing is used these will be run on a subsample of the full dataset)
- Per each chip: computes cosine similarity over time versus a baseline date (the first date in the series)
- Distribution strategy: by chip_id (e.g., `12_25`)

##### Output structure

```
output/
└── <aoi_name>/
      └── processed-embeddings/
      └── <MGRS_grid>/
            └── <x_dim>_<y_dim>_processed_embeddings.parquet
```

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = sagemaker.Session().boto_region_name
role = get_execution_role()


embedding_proc_processor = ScriptProcessor(
    command=['python3'],
    image_uri=geo_proc_image_uri_param,
    role=role,
    instance_count=emb_processing_instance_count_param,
    instance_type='ml.m5.24xlarge',
    volume_size_in_gb=250
)

In [None]:
pipeline_step_3 = ProcessingStep(
    name="EmbeddingProcessingStep",
    processor=embedding_proc_processor,
    inputs=[
         ProcessingInput( 
             source=s3_destination_meta,
             destination='/opt/ml/processing/input/meta',
             s3_data_distribution_type="FullyReplicated",
         ),
        ProcessingInput( 
             source=s3_destination_chip_ids,
             destination='/opt/ml/processing/input/chip_ids',
             s3_data_distribution_type="ShardedByS3Key", #shard by chip_id
         )
    ],
    outputs=[
            ProcessingOutput(
                source='/opt/ml/processing/output/',
                destination=s3_destination_embeddings_processed
            )],
    job_arguments=["--BUCKET_NAME", bucket_name_param,
                   "--AOI_NAME", aoi_name_param,
                   "--EMBEDDING_TYPE",str("patch"),
                   "--CHIP_MAX_CLOUD",str(1.0),
                  ],
    code='scripts/run_chip_temporal_analysis.py',
    depends_on=[pipeline_step_2],
    cache_config=cache_config
)

## Step 4: Consolidate Outputs

- Consolidates processed embedding `.parquet` files into a single results file per Sentinel-2 grid ID (i.e., MGRS grid) and loads vectors and metadata into a [LanceDB](https://lancedb.github.io/lancedb/) vector database. Also generates `.geojson` files that contain chips to be used as an overlay in the frontend.
- Distribution strategy: none

##### Output structure

```
output/
└── <aoi_name>/
      └── consolidated-output/
      └── <MGRS_grid>/
            └── consolidated_output.parquet
            └── chip_grid_aoi.geojson
            └── chip_grid_full_s2_tile.geojson
            └── config_<aoi_name>.json
      └── lance-db/
      └── vector_db.lance
```

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = sagemaker.Session().boto_region_name
role = get_execution_role()

consolidation_processor = ScriptProcessor(
    command=['python3'],
    image_uri=geo_proc_image_uri_param,
    role=role,
    instance_count=1,
    instance_type='ml.m5.4xlarge',
    volume_size_in_gb=50
)

In [None]:
pipeline_step_4 = ProcessingStep(
    name="ResultsConsolidationandVectorDBLoadStep",
    processor=consolidation_processor,
    inputs=[
         ProcessingInput( 
             source=s3_destination_embeddings_processed,
             destination='/opt/ml/processing/input/embeddings',
             s3_data_distribution_type="FullyReplicated",
         ),
        ProcessingInput(
            source=s3_destination_meta,
            destination='/opt/ml/processing/input/meta',
            s3_data_distribution_type="FullyReplicated",),
        ProcessingInput(
            source=s3_destination_aoi,
            destination='/opt/ml/processing/input/aoi',
            s3_data_distribution_type="FullyReplicated",),
    ],
    outputs=[
            ProcessingOutput(
                source='/opt/ml/processing/output',
                destination=s3_destination_consolidated_output              
            )],    
    job_arguments=["--LANCEDB_URI", s3_destination_lance_db,
                   "--AOI_NAME", aoi_name_param,
                   "--S2_TILE_ID",s2_grid_id_param,
                   "--BUCKET_NAME",bucket_name_param,
                   "--ENV_NAME", env_name_param,
                   "--ACCOUNT_ID", account_number,
                   "--REGION", region,
                  ],
    code='scripts/consolidate.py',
    depends_on=[pipeline_step_3],
    cache_config=cache_config
)

## Define Pipeline

Initialize the pipeline

In [None]:
pipeline = Pipeline(
    name=f"EmbeddingPipeline-{aoi_name}",
    steps=[pipeline_step_1, pipeline_step_2, pipeline_step_3,pipeline_step_4],
    parameters=[bucket_name_param,
                env_name_param,
                aoi_name_param,
                s2_grid_id_param,
                s2_processing_lvl_param, 
                chip_size_param,
                batch_size_param,
                s2_bands_param,
                geo_proc_image_uri_param,
                geofm_emb_gen_image_uri_param,
                raw_processing_instance_count_param,
                emb_generation_instance_count_param, 
                emb_processing_instance_count_param,],
    sagemaker_session=sagemaker_session
)
pipeline.upsert(role_arn=role)

Set parameters

In [None]:
#define pipeline parameters
parameters=dict(
        RawProcessingInstanceCount=20,
        EmbeddingGenerationInstanceCount=5,
        EmbeddingProcessingInstanceCount=5,
    )

Execute the pipeline

In [None]:
execution = pipeline.start(parameters=parameters)

## Review Pipeline Output

For details check the Pipeline Status on SageMaker Studio. After successful run you can deploy the ui. 

Ensure to copy and paste the content of the config file 
- from `s3://aws-geofm-data-bucket-{YOUR_ACCOUNT_NUMBER}-{AWS_REGION}-dev/output/{AOI_NAME}/consolidated-output/{TILE_ID}/{AOI_NAME}.json` 
- into `demo_config.json` situated at `ui/geofm-demo-stack/solara-fe/demo_config.json`

**Get Metadata Files**

In [None]:
s3_destination_meta=f's3://{bucket_name}/output/{aoi_name}/meta/'
meta_files = gpd.read_parquet(s3_destination_meta)

__Check Data Cubes__

Retrieve & Combine Scenes for a single chip into Data Cube with Dimensions (time, x, y)

In [None]:
x_tile_num = len(meta_files["x_dim"].unique())
y_tile_num = len(meta_files["y_dim"].unique())
time_num = len(meta_files["date"].unique())
print(f"Tile has been split into {x_tile_num} by {y_tile_num} chips")

In [None]:
x=20
y=20
chips = meta_files[(meta_files["x_dim"]==x)&(meta_files["y_dim"]==y)]

In [None]:
print("Date range covered: {} to {}".format(chips["date"].min(),chips["date"].max()))

In [None]:
!pip install -q xarray

In [None]:
import fsspec
import xarray as xr
scenes=[]
for file in list(chips["s3_location_netcdf"]):
    with fsspec.open(file) as f:
        scene = xr.open_dataset(f,decode_coords="all")
    scenes.append(scene)
#generate cube
s2_chip_cube=xr.concat(objs=scenes, coords="minimal", dim="time",join='outer')
s2_chip_cube= s2_chip_cube.sortby("time")

In [None]:
print("Cube dims:", s2_chip_cube.sizes)
print("Cube crs:", s2_chip_cube.rio.crs)
print("Cube bounds:", s2_chip_cube.rio.bounds())

In [None]:
s2_chip_cube.sel(time=s2_chip_cube.time[-1]).B02.plot()

__Check Embeddings__

Load embeddings and check vector size

In [None]:
import numpy as np
from io import BytesIO
def readEmbeddingsFromS3(s3_bucket, s3_output_object):
    # Initialize a session using your credentials
    s3_client = boto3.client('s3')

    # Get the object from the S3 bucket
    response = s3_client.get_object(Bucket=s3_bucket, Key=s3_output_object)

    # Read the object's content into a NumPy array
    file_content = response['Body'].read()
    np_array = np.load(BytesIO(file_content), allow_pickle=True)

    return np_array

def load_embeddings(s3_paths):
    chip_embeddings = {}
    for f in s3_paths:
        try:
            bucket = f.split("/")[2]
            #print(bucket)
            obj_path = "/".join(f.split("/")[3:])
            emb = readEmbeddingsFromS3(bucket, obj_path)
            chip_embeddings[emb['s3_location_netcdf']] = emb
        except Exception as e:
            print(f"Exception: {e}")
            print(f"No embeddings for file {f}")
            
    return chip_embeddings

__End of Notebook__