In [1]:
import os
import psycopg2
from pystac_client import Client
import pystac
from pystac.extensions.projection import ProjectionExtension
from pystac.extensions.raster import RasterExtension
import stackstac

# Resolve ints stored as floats

Goal of this notebook is to test and resolve the integers being stored as floats in the MAAP VEDA pgSTAC database.

## 1. Determinte what extensions are being used to identify which schemas to check for integer fields which need to be updated.

What extensions are being used?
in veda data pipelines, we use rio_stac.create_stac_item to build stac metadata. 
https://github.com/MAAP-Project/veda-data-pipelines/blob/main/lambdas/build-stac/utils/stac.py#L56

I couldn't know what version of rio-stac is being used, but downloading the build-stac lambda docker image from our MAAP VEDA TEst stack in UAH returned 0.4.2

```
root@2b637b648fe7:/app# pip show rio-stac
Name: rio-stac
Version: 0.4.2
```

We are not using the [extensions](https://github.com/developmentseed/rio-stac/blob/0.4.2/rio_stac/stac.py#L211) argument that you can pass but we are using [`with_proj` and `with_raster`](https://github.com/developmentseed/rio-stac/blob/0.4.2/rio_stac/stac.py#L221-L222) arguments, which then use the proj and raster extensions.

https://stac-extensions.github.io/projection/v1.0.0/schema.json

* "type": "integer"
* proj:epsg and proj:shape

https://stac-extensions.github.io/raster/v1.1.0/schema.json

* "type": "integer"
* bands-> items -> properties -> bits_per_sample
    * Appears bits_per_sample is missing so we don't need to be concerned with it https://github.com/developmentseed/rio-stac/blob/0.4.2/rio_stac/stac.py#L136
* bands-> items -> properties -> histogram -> properties -> buckets

## 2. Define update functions

In [10]:
def connect_and_execute(database, user, password, host, port, sql):
    conn = None
    try:
        conn = psycopg2.connect(
            dbname=database,
            user=user,
            password=password,
            host=host,
            port=port
        )
        cursor = conn.cursor()
        print(cursor.execute(sql))
        conn.commit()
        cursor.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
    finally:
        if conn is not None:
            conn.close()

def fix_epsg_sql(collection_id):
    return f"""
        EXPLAIN ANALYZE
        WITH collection_items AS (
            SELECT id
            FROM items
            WHERE collection = '{collection_id}'
            AND (content->'properties'->>'proj:shape')::text ~ '(\d+)\.0+\b'
        )
        UPDATE items
        SET content = jsonb_set(
            content,
            '{{proj:shape}}',
            regexp_replace(
                (content->'properties'->>'proj:shape')::text,
                '(\d+)\.0+\b',
                '\1',
                'g'
            )::jsonb
        )
        FROM collection_items
        WHERE items.id = collection_items.id;
        """

def fix_shape_sql(collection_id):
    return f"""
        UPDATE items
        SET content = jsonb_set(
            content,
            '{{properties, proj:shape}}',
            (
                SELECT jsonb_agg(CAST(FLOOR(CAST(value AS NUMERIC)) AS INTEGER))
                FROM jsonb_array_elements_text(content #> '{{properties, proj:shape}}')
            )
        )
        WHERE collection = '{collection_id}';
        """

def fix_buckets_sql(collection_id):
    return f"""
        CREATE OR REPLACE FUNCTION update_buckets_to_integers()
        RETURNS VOID AS $$
        DECLARE
            item_id TEXT;
            bands JSONB;
            updated_bands JSONB;
            band JSONB;
            index INTEGER := 0;
        BEGIN
            FOR item_id, bands IN
                SELECT id, content #> '{{assets, cog_default, raster:bands}}' FROM items
                WHERE collection = '{collection_id}'
            LOOP
                updated_bands := '[]'::JSONB;

                FOR band IN SELECT * FROM jsonb_array_elements(bands)
                LOOP
                    band := jsonb_set(
                        band,
                        '{{histogram, buckets}}',
                        (SELECT jsonb_agg(CAST(FLOOR(value::TEXT::NUMERIC) AS INTEGER))
                         FROM jsonb_array_elements(band #> '{{histogram, buckets}}'))
                    );
                    updated_bands := updated_bands || band;
                END LOOP;

                UPDATE items
                SET content = jsonb_set(content, '{{assets, cog_default, raster:bands}}', updated_bands)
                WHERE id = item_id;
            END LOOP;
        END;
        $$ LANGUAGE plpgsql;

        /* run the FUNCTION */
        SELECT dashboard.update_buckets_to_integers();
        """

# Connect to database
host = "veda-backend-test-postgres.coy9fgtxpaqn.us-west-2.rds.amazonaws.com"
database = "postgis"
user = "delta"
password = "xxx"
port = 5432

## 3. Fix the collections

Run the code in part 3 once to fix and again to check the fix.

In [29]:
# for each collection, check the proj:epsg, proj:shape are floats and 
# raster:bands
catalog_url = "https://stac.maap-project.org"
client = Client.open(catalog_url)

# Check if all items in the list are integers
def all_integers(l):
    return all(isinstance(d, int) for d in l)

def run_fix(sample_values, field, fix_function, collection_id):
    if not all_integers(sample_values):
        print(f"Fixing {field} for {collection_id} which is currently a float: {sample_values}.")
        if DRY_RUN:
            print("Dry run")
        else:
            connect_and_execute(database, user, password, host, port, fix_function(collection_id))
    else:
        print(f"{field} for {collection_id} is fixed: {sample_values}.")

def run_all_fixes():
    for collection in client.get_all_collections():
        collection_id = collection.id
        collection = client.get_collection(collection_id)
        items = collection.get_items()
        print(f"Checking proj:epsg, proj:shape and buckets for {collection_id}")
        try:
            first_item = next(iter(items), None)
        except Exception as e:
            print(f"Encountered error {e} for {collection_id}")

        if first_item == None:
            continue

        epsg = first_item.properties.get("proj:epsg")
        if epsg:
            run_fix([epsg], 'epsg code', fix_epsg_sql, collection_id)
        else:
            print(f"No epsg code for {collection_id}.")


        shape = first_item.properties.get("proj:shape")
        if shape:
            run_fix(shape, 'shape', fix_shape_sql, collection_id)
        else:
            print(f"No shape for {collection_id}.")

        first_asset = first_item.assets.get('cog_default')        
        if first_asset:
            buckets = first_asset.extra_fields['raster:bands'][0]['histogram']['buckets']
            run_fix(buckets, 'buckets', fix_buckets_sql, collection_id)
        else:
            print(f"No cog_default_asset for {collection_id}.")        

        print('\n')


In [30]:
DRY_RUN = True
run_all_fixes()

Checking proj:epsg, proj:shape and buckets for GlobCover_09
No epsg code for GlobCover_09.
No shape for GlobCover_09.
No cog_default_asset for GlobCover_09.


Checking proj:epsg, proj:shape and buckets for AfriSAR_UAVSAR_Coreg_SLC
No epsg code for AfriSAR_UAVSAR_Coreg_SLC.
No shape for AfriSAR_UAVSAR_Coreg_SLC.
No cog_default_asset for AfriSAR_UAVSAR_Coreg_SLC.


Checking proj:epsg, proj:shape and buckets for BIOSAR1
No epsg code for BIOSAR1.
No shape for BIOSAR1.
No cog_default_asset for BIOSAR1.


Checking proj:epsg, proj:shape and buckets for AfriSAR_AGB_Maps_1681
Fixing epsg code for AfriSAR_AGB_Maps_1681 which is currently a float: [32732.0].
Dry run
Fixing shape for AfriSAR_AGB_Maps_1681 which is currently a float: [101.0, 105.0].
Dry run
No cog_default_asset for AfriSAR_AGB_Maps_1681.


Checking proj:epsg, proj:shape and buckets for Landsat8_SurfaceReflectance
No epsg code for Landsat8_SurfaceReflectance.
No shape for Landsat8_SurfaceReflectance.
No cog_default_asset for Landsat