In [None]:
from datetime import datetime
import pystac
from pystac.extensions.eo import EOExtension
from pystac.extensions.sat import SatExtension
import os
from minio import Minio
import tifffile
from pathlib import Path
import zarr
import numpy as np
import shutil
import re
from dateutil import parser
from PIL import Image
import xmltodict
from dotenv import load_dotenv
import json

In [2]:
load_dotenv()

True

In [None]:
# Function to recursively upload to minio server
def upload_to_minio(client, bucket_name, local_path, minio_path):
    """Upload a file or directory to MinIO server"""
    if os.path.isfile(local_path):
        client.fput_object(bucket_name, minio_path, local_path)
    elif os.path.isdir(local_path):
        for root, _, files in os.walk(local_path):
            for file in files:
                local_file_path = os.path.join(root, file)
                minio_file_path = os.path.join(minio_path, os.path.relpath(local_file_path, local_path))
                client.fput_object(bucket_name, minio_file_path, local_file_path)

In [None]:
# Minio client setup
minio_client = Minio(
    os.getenv("MINIO_ENDPOINT", "localhost:9000"),
    access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
    secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"),
    secure=False 
)

In [None]:
bucket_name = os.getenv('S3_BUCKET_NAME')
data_dir = Path(os.getenv('DATA_DIR'))

In [None]:
# Ensure the bucket exists
if not minio_client.bucket_exists(bucket_name):
    print(f"Creating bucket: {bucket_name}")
    minio_client.make_bucket(bucket_name)

In [None]:
scenes_dir = data_dir / "Scenes"
masks_dir = data_dir / "Masks"
metadata_dir = data_dir / "Metadata"

data = json.load((data_dir / "data.json").open())

In [8]:
print(data)

{'platform': 'sentinel-2', 'band_layout': 'disjoint', 'bands': {'ultra-blue': 'B01', 'blue': 'B02', 'green': 'B03', 'red': 'B04', 'vnir': 'B08', 'swir': 'B11'}, 'notes': 'Each band is a separate single-band GeoTIFF', 'metadata_available': True, 'cloud_cover_available': True}


In [None]:
# Tiling function to split images into tiles
def tile_image(image, tile_size=(384, 384)):
    h, w = image.shape[:2]
    tiles = []

    for y in range(0, h, tile_size[0]):
        for x in range(0, w, tile_size[1]):
            tile = image[y:y+tile_size[0], x:x+tile_size[1]]
            tiles.append(tile)

    return tiles 

# Function to pad tiles to deal with edges
def pad_to_shape(tile, shape=(384, 384)):
    padded = np.zeros(shape, dtype=tile.dtype)
    h, w = tile.shape
    padded[:h, :w] = tile
    return padded

# Function to calculate the percentage of white pixels in a mask
def white_percentage_batch(masks):
    masks_flat = masks.reshape(masks.shape[0], -1)
    white = np.sum(masks_flat == 255, axis=1)
    black = np.sum(masks_flat < 255, axis=1)
    total = white + black
    with np.errstate(divide='ignore', invalid='ignore'):
        percentages = np.where(total > 0, (white / total) * 100, 0.0)
    return percentages

In [None]:
# Function to upload Zarr data to MinIO
def upload_zarr(image,variable_name,bucket_name):
    # Name the tile and create a local Zarr store
    tile_name = f"{variable_name}"
    local_zarr_store_path = f"{tile_name}.zarr"
    local_zarr_store_path_str = str(local_zarr_store_path)
    root_group = zarr.open_group(local_zarr_store_path_str, mode='w')
    
    # Set dimensions and chunks
    if image.ndim == 2:
        dimension_names = ['y', 'x']
        zarr_array_chunks = (384,384)
    elif image.ndim == 3:
        dimension_names = ['y', 'x', 'band']
        zarr_array_chunks = (384,384, 1)
    else:
        raise ValueError(f"Unsupported image shape: {image.shape}")
    
    # Create the Zarr array with the specified dimensions and chunks
    z_array = root_group.create_dataset(
        name=tile_name,
        data=image,
        chunks=zarr_array_chunks,
        dtype=image.dtype,
        overwrite=True
    )
    
    # Set attributes for the Zarr array
    z_array.attrs['_ARRAY_DIMENSIONS'] = dimension_names
    zarr.consolidate_metadata(local_zarr_store_path_str)
    
    # Upload the Zarr store to MinIO and delete the local store
    minio_zarr_path_prefix = f"mask/{tile_name}.zarr"
    upload_to_minio(minio_client, bucket_name, local_zarr_store_path, minio_zarr_path_prefix)
    shutil.rmtree(local_zarr_store_path_str)
    
    print(f"Uploaded tiles for variable '{variable_name}' to MinIO bucket '{bucket_name}'.")

In [None]:
# To parse metadata files based on platform
def parse_mtl(mtl_path,data):
    if data["platform"] == "landsat-8":
        metadata = {}
        pattern = re.compile(r'(\w+)\s=\s"?(.*?)"?$')
        with open(mtl_path, 'r') as file:
            for line in file:
                match = pattern.search(line.strip())
                if match:
                    key, val = match.groups()
                    metadata[key] = val
    elif data["platform"] == 'sentinel-2':
        metadata = {}
        with open(mtl_path,'r') as f:
            metadata  = xmltodict.parse(f.read())
    return metadata

In [None]:
def make_stac_item(metadata, zarr_path, tile_id, cloud_cover, data):
    
    if data["platform"] == "landsat-8" and data["metadata_available"]:
        # Bounding box (UL, LR)
        ul_lat = float(metadata["CORNER_UL_LAT_PRODUCT"])
        ul_lon = float(metadata["CORNER_UL_LON_PRODUCT"])
        lr_lat = float(metadata["CORNER_LR_LAT_PRODUCT"])
        lr_lon = float(metadata["CORNER_LR_LON_PRODUCT"])
        bbox = [ul_lon, lr_lat, lr_lon, ul_lat]

        # Geometry as polygon (UL, UR, LR, LL, back to UL)
        geometry = {
            "type": "Polygon",
            "coordinates": [[
                [float(metadata["CORNER_UL_LON_PRODUCT"]), float(metadata["CORNER_UL_LAT_PRODUCT"])],
                [float(metadata["CORNER_UR_LON_PRODUCT"]), float(metadata["CORNER_UR_LAT_PRODUCT"])],
                [float(metadata["CORNER_LR_LON_PRODUCT"]), float(metadata["CORNER_LR_LAT_PRODUCT"])],
                [float(metadata["CORNER_LL_LON_PRODUCT"]), float(metadata["CORNER_LL_LAT_PRODUCT"])],
                [float(metadata["CORNER_UL_LON_PRODUCT"]), float(metadata["CORNER_UL_LAT_PRODUCT"])]
            ]]
        }


        # Acquisition time
        dt_str = metadata["DATE_ACQUIRED"] + "T" + metadata["SCENE_CENTER_TIME"]
        dt = parser.isoparse(dt_str)

        # Create item
        item = pystac.Item(
            id=metadata["LANDSAT_PRODUCT_ID"],
            bbox=bbox,
            geometry=geometry,
            datetime=dt,
            properties={
                "platform": metadata["SPACECRAFT_ID"].lower().replace("_", "-"),
                "instruments": [i.lower() for i in metadata["SENSOR_ID"].split("_")],
                "eo:cloud_cover": float(np.mean(cloud_cover)),
                "tilewise:cloud_cover": [float(c) for c in cloud_cover],
                "METADATA_AVAILABLE": data["metadata_available"],
                "CLOUD_COVER_AVAILABLE": data["cloud_cover_available"],
                "bands": data["bands"],
                "sat:off_nadir": 0.0,
                "sat:orbit_state": "descending",
                "gsd": 30
            }
        )
    elif data['platform'] == "GoogleEarthEngine" or data['platform'] == "wdcd":
        # Create a generic item for platforms without specific metadata
        item = pystac.Item(
            id=tile_id,
            bbox=[-180, -90, 180, 90],
            geometry={
                "type": "Polygon",
                "coordinates": [[
                    [-180, -90], [180, -90], 
                    [180, 90], [-180, 90], 
                    [-180, -90]
                ]]
            },
            datetime=datetime.now(),
            properties={
                "METADATA_AVAILABLE": data["metadata_available"],
                "CLOUD_COVER_AVAILABLE": data["cloud_cover_available"],
                "eo:cloud_cover": float(np.mean(cloud_cover)),
                "tilewise:cloud_cover": [float(c) for c in cloud_cover],
                "gsd":8
            }
        )
    elif data["platform"] == 'sentinel-2':
        
        tile = metadata['n1:Level-1C_User_Product']
        general_info = tile['n1:General_Info']['Product_Info']

        tile_id = general_info['PRODUCT_TYPE']
        sensing_time = general_info['GENERATION_TIME']
        
        geometric_info = tile['n1:Geometric_Info']

        # Convert sensing time to datetime
        dt = datetime.fromisoformat(sensing_time.replace("Z", "+00:00"))

        # Hardcode or extract values as needed
        platform = "sentinel-2a"
        instrument = ["msi"]
        
        coord_list = list(map(float, geometric_info['Product_Footprint']['Product_Footprint']['Global_Footprint']['EXT_POS_LIST'].split()))

        # Separate into lat/lon pairs and convert to list of [lon, lat] coordinates
        polygon_coords = []
        lats = []
        lons = []
        for i in range(0, len(coord_list), 2):
            lat = coord_list[i]
            lon = coord_list[i+1]
            polygon_coords.append([lon, lat])  # GeoJSON uses [lon, lat] order
            lats.append(lat)
            lons.append(lon)

        # Calculate bbox [min_lon, min_lat, max_lon, max_lat]
        bbox = [min(lons), min(lats), max(lons), max(lats)]

        # Create GeoJSON polygon (first and last point must be identical)
        geometry = {
            "type": "Polygon",
            "coordinates": [polygon_coords]  # Note the extra list wrapping for GeoJSON
        }
        
        # Create item
        item = pystac.Item(
            id=tile_id,
            bbox=bbox,
            geometry=geometry,
            datetime=dt,
            properties={
                "platform": platform,
                "instruments": instrument,
                "eo:cloud_cover": float(np.mean(cloud_cover)),
                "tilewise:cloud_cover": [float(c) for c in cloud_cover],
                "bands": data["bands"],
                "METADATA_AVAILABLE": True,
                "CLOUD_COVER_AVAILABLE": True,
                "sat:off_nadir": 0.0,
                "sat:orbit_state": "descending",
                "gsd": 10
            }
        )

    # Enable extensions
    EOExtension.add_to(item)
    SatExtension.add_to(item)

    # Add Zarr asset
    zarr_href = zarr_path  # Adjust as needed
    item.add_asset(
        "data_zarr",
        pystac.Asset(   
            href=zarr_href,
            media_type="application/vnd+zarr",
            roles=["data"],
            title=f"{data['platform']} Zarr Dataset",
            extra_fields={
                "xarray:open_kwargs": {"consolidated": True}
            }
        )
    )

    return item

In [None]:
def generate_stac(mtl_filename, variable_name, cloud_percentages, data):
    # Ensure the metadata file exists
    if(data['metadata_available']):
        metadata = parse_mtl(mtl_filename,data)
    else:
        metadata = ''
        

    # for i, cloud_cover in enumerate(cloud_percentages):
    minio_zarr_path = f"raw/{variable_name}.zarr"
    
    item = make_stac_item(metadata, minio_zarr_path, variable_name, cloud_percentages, data)
    item.save_object(dest_href=f"{variable_name}.json")
    
    # print(item.id)
    minio_stac_path = f"stac/{variable_name}.json"
    # print(f"Uploading STAC to MinIO: {minio_stac_path}")
    upload_to_minio(minio_client, bucket_name, f"{variable_name}.json", minio_stac_path)
    os.remove(f"{variable_name}.json")
    
    print(f"Generated and uploaded STAC items for variable '{variable_name}' with cloud cover percentages to MinIO bucket '{bucket_name}'.")

In [None]:
# Upload regular files and convert to zarr
for file_path in scenes_dir.glob("*"):
    
    # Read whole image
    if data['band_layout'] == 'stacked':
        for sub_file_path in file_path.glob("*"):    
            if sub_file_path.suffix == ".tif" or sub_file_path.suffix == ".TIF" or sub_file_path.suffix == ".tiff":

                scene = tifffile.imread(sub_file_path)
                
                variable_name = sub_file_path.stem
    
    # Stack disjoint bands   
    elif data['band_layout'] == 'disjoint':
        bands = ['B02','B03','B04','B08']
        stack = []
        for band in bands:
            for sub_file_path in file_path.glob(f"*{band}*"):
                
                img = Image.open(sub_file_path)
                stack.append(np.array(img))
                
        scene = np.stack(stack, axis=-1)
    
    variable_name = file_path.stem
    
    print(f"Variable name: {variable_name}")
            
    mask_path = masks_dir / variable_name
    
    for masks in mask_path.glob("*"):
        if masks.suffix == ".tif" or masks.suffix == ".TIF" or sub_file_path.suffix == ".tiff":
            mask = np.array(Image.open(masks))
            if data['platform'] == 'landsat-8':
                mask = mask.astype(np.uint8) * 255
            
    gt_tiles = tile_image(mask)
    
    padded_tiles = [pad_to_shape(tile) for tile in gt_tiles]
    padded_tiles_array = np.stack(padded_tiles)  # Shape: (N, 384, 384)
    percent_white = white_percentage_batch(padded_tiles_array)
    
    upload_zarr(scene, variable_name, bucket_name)
    
    metadata_path = metadata_dir / variable_name
    
    if data['metadata_available']:
        for metadata in metadata_path.glob("*"):
            generate_stac(metadata, variable_name, percent_white, data)
    else:
        generate_stac("", variable_name, percent_white, data)
        
    #delete the filepath we just worked on
    shutil.rmtree(file_path)



Variable name: S2A_MSIL1C_20191023T040821_N0208_R047_T47TQF_20191023T074550
Uploaded tiles for variable 'S2A_MSIL1C_20191023T040821_N0208_R047_T47TQF_20191023T074550' to MinIO bucket 'fusion-lake'.
Generated and uploaded STAC items for variable 'S2A_MSIL1C_20191023T040821_N0208_R047_T47TQF_20191023T074550' with cloud cover percentages to MinIO bucket 'fusion-lake'.
Variable name: S2A_MSIL1C_20191215T042151_N0208_R090_T46RGV_20191215T065406
Uploaded tiles for variable 'S2A_MSIL1C_20191215T042151_N0208_R090_T46RGV_20191215T065406' to MinIO bucket 'fusion-lake'.
Generated and uploaded STAC items for variable 'S2A_MSIL1C_20191215T042151_N0208_R090_T46RGV_20191215T065406' with cloud cover percentages to MinIO bucket 'fusion-lake'.
Variable name: S2A_MSIL1C_20200325T034531_N0209_R104_T47RQL_20200325T065315
Uploaded tiles for variable 'S2A_MSIL1C_20200325T034531_N0209_R104_T47RQL_20200325T065315' to MinIO bucket 'fusion-lake'.
Generated and uploaded STAC items for variable 'S2A_MSIL1C_2020032

In [None]:
masks_dir_temp = Path('Masks')
minio_mask_path = 's3://'

In [None]:
# Upload corresponding masks to MinIO
for mask_path in masks_dir_temp.glob('*'):
    variable_name = mask_path.stem
    
    for mask in mask_path.glob('*'):
        mask_image = np.array(Image.open(mask))
        mask_image = mask_image.astype(np.uint8)
        if mask_image.max() > 1:
            mask_image[mask_image < 255] = 0
            mask_image[mask_image == 255] = 1
        
        upload_zarr(mask_image, variable_name, 'fusion-lake')
        
    shutil.rmtree(mask_path)

Uploaded tiles for variable 'WDCD_09' to MinIO bucket 'fusion-lake'.




Uploaded tiles for variable 'S2A_MSIL1C_20191023T040821_N0208_R047_T47TQF_20191023T074550' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'LC08_L1TP_064012_20160420_20170223_01_T1' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'S2A_MSIL1C_20191215T042151_N0208_R090_T46RGV_20191215T065406' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'LC08_L1TP_063013_20160920_20170221_01_T1' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'WDCD_02' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'S2A_MSIL1C_20200325T034531_N0209_R104_T47RQL_20200325T065315' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'S2A_MSIL1C_20190328T033701_N0207_R061_T49TCF_20190328T071457' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'S2A_MSIL1C_20190714T043711_N0208_R033_T46TFN_20190714T073938' to MinIO bucket 'fusion-lake'.
Uploaded tiles for variable 'S2A_MSIL1C_20190819T031541_N0208_R118_T49SFU_20190819T065332' to MinIO bucket 'fusion-la