# Obtención de las imágenes provenientes de Landsat-7/8 y 9

EL siguiente código emplea los paquetes *planetary_computer* y *pystac_client* para establecer la conexión con la STAC API de Planetary Computer [[1]](https://planetarycomputer.microsoft.com/docs/quickstarts/reading-stac/).  El paquete ocd.stac se emplea para la lectura y visualización de las imágenes satelitales provenientes de los satélites Landsat-7, Landsat-8 y Landsat-9 [[2]](https://odc-stac.readthedocs.io/en/latest/_api/odc.stac.load.html#odc.stac.load).

## Imports

In [None]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [None]:
import cv2
from datetime import timedelta
import numpy as np
import odc.stac
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import planetary_computer as pc
from pystac_client import Client
import geopy.distance as distance
import json

%matplotlib inline

## Definición de parámetros

In [None]:
# Directorios de donde se leerá y almacenará la información
DATA_DIR = Path.cwd().resolve() / "data"
LANDSAT_DATA_DIR = DATA_DIR / "landsat"
SENTINEL_DATA_DIR = DATA_DIR / "sentinel"

# Desenmascaramiento de los bits clear según codificación.
CLEAR_BIT_FILTER = np.vectorize(lambda num: int(format(num, "#018b")[-7]))

# Máximo de días hacia atrás donde se permite descargar la imagen, respecto a la fecha de la medición in situ
TIME_BUFFER_DAYS = 15

# Tamaño del detalle de la imagen (se aplica a cada dirección cardinal)
METER_BUFFER = 200

## Metadata in situ

In [None]:
metadata = pd.read_csv(DATA_DIR / "metadata.csv")

In [None]:
# Se convierte la variable fecha al tipo datetime
metadata.date = pd.to_datetime(metadata.date)

## Funciones

In [None]:
def get_bounding_box(latitude, longitude, meter_buffer=50000):
    """
    Dada una latitud, longitud y buffer en metros, devuelve una superficie rectangular alrededor de las coordenadas proporcionadas en
    cada una de las direcciones cardinales

    Devuelve una lista de [minx, miny, maxx, maxy]
    """
    distance_search = distance.distance(meters=meter_buffer)

    # calcula la lat/long basada en la distancia en tierra
    # cada orientación corresponde a una dirección cardinal (sur, oeste, norte, y este)
    min_lat = distance_search.destination((latitude, longitude), bearing=180)[0]
    min_long = distance_search.destination((latitude, longitude), bearing=270)[1]
    max_lat = distance_search.destination((latitude, longitude), bearing=0)[0]
    max_long = distance_search.destination((latitude, longitude), bearing=90)[1]

    return [min_long, min_lat, max_long, max_lat]

In [None]:
def get_date_range(date, time_buffer_days=TIME_BUFFER_DAYS):
    """
    Obtiene el rango de fechas con las que se busca en PC a partir de la fecha de medición y el límite de días atras establecido

    Devuelve un string con el rango
    """
    datetime_format = "%Y-%m-%dT"
    range_start = pd.to_datetime(date) - timedelta(days=time_buffer_days)
    date_range = f"{range_start.strftime(datetime_format)}00:00:00Z/{pd.to_datetime(date).strftime(datetime_format)}23:59:59Z"

    return date_range

In [None]:
def crop_landsat_image(item, bounding_box):
    """
    Dado un STAC ítem de Landsat y la bbox en formato tupla (minx, miny, maxx, maxy), devuelve un recorte de la imagen

    Devuelve las imágenes como un vector numpy con dimensiones [(3,Px,Py),(1,Px,Py), (1,Px,Py)], siendo Px y Py los píxeles
    en cada dirección del plano
    """

    (minx, miny, maxx, maxy) = bounding_box
    bands_of_interest = ["red", "green", "blue", "lwir11", "nir08"]
    image = odc.stac.stac_load(
        [item.item_obj],
        bands=bands_of_interest,
        bbox=[minx, miny, maxx, maxy],
        resolution=30,  # resolución 30 m
        crs="utm",
    ).isel(time=0)
    image_array = image[["red", "green", "blue"]].to_array().to_numpy()  # imagen visual
    temp_image_array = (
        image[["lwir11"]].to_array().to_numpy()
    )  # imagen para la obtención de la temperatura
    nir_image_array = (
        image[["nir08"]].to_array().to_numpy()
    )  # imagen para la banda cercana al infrarrojo (NIR)

    # Se normalizan los valores a 0-255 para unificar los rangos con las imágenes de Sentinel - 2
    visual_image_array = cv2.normalize(image_array, None, 0, 255, cv2.NORM_MINMAX)

    return [visual_image_array, temp_image_array, nir_image_array]

In [None]:
def crop_landsat7_image(item, bounding_box):
    """
    Dado un STAC ítem de Landsat-7 y la bbox en formato tupla (minx, miny, maxx, maxy), devuelve un recorte de la imagen

    Devuelve las imágenes como un vector numpy con dimensiones [(3,Px,Py),(1,Px,Py), (1,Px,Py)], siendo Px y Py los píxeles
    en cada dirección del plano.

    * La diferencia con respecto a la función crop_landsat_image es la banda de temperatura, cuyo nombre de asset cambia de lwir11 a lwir
    """

    (minx, miny, maxx, maxy) = bounding_box
    bands_of_interest = ["red", "green", "blue", "lwir", "nir08"]
    image = odc.stac.stac_load(
        [item.item_obj],
        bands=bands_of_interest,
        bbox=[minx, miny, maxx, maxy],
        resolution=30,  # resolución de píxel de 30 m
        crs="utm",
    ).isel(time=0)
    image_array = (
        image[["red", "green", "blue"]].to_array().to_numpy()
    )  # imagen visual a partir de las tres bandas de colores
    temp_image_array = (
        image[["lwir"]].to_array().to_numpy()
    )  # imagen para la obtención de la temperatura
    nir_image_array = image[["nir08"]].to_array().to_numpy()  # imagen NIR

    # Se normalizan los valores a 0-255 para unificar los rangos con las imágenes de Sentinel - 2
    visual_image_array = cv2.normalize(image_array, None, 0, 255, cv2.NORM_MINMAX)

    return [visual_image_array, temp_image_array, nir_image_array]

In [None]:
# Refactor our process from above into functions
def select_item_list(items, date, latitude, longitude):
    """
    Devuelve una lista con los ítems válidos:
    - Debe contener la coordenada de medición
    - Se encuentra dentro del rango temporal definido

    Devuelve una estructura dataframe ordenada por la diferencia temporal con respecto a la fecha de medición (de menor a mayor),
    con los detalles de los ítems
    """
    # Obtención de los atributos de los ítems
    item_details = pd.DataFrame(
        [
            {
                "datetime": item.datetime.strftime("%Y-%m-%d"),
                "platform": item.properties["platform"],
                "min_long": item.bbox[0],
                "max_long": item.bbox[2],
                "min_lat": item.bbox[1],
                "max_lat": item.bbox[3],
                "item_obj": item,
                "cloud_cover": item.properties["eo:cloud_cover"],
            }
            for item in items
        ]
    )

    # filtrado de los puntos que contienen la localización de la medición
    item_details["contains_sample_point"] = (
        (item_details.min_lat < latitude)
        & (item_details.max_lat > latitude)
        & (item_details.min_long < longitude)
        & (item_details.max_long > longitude)
    )
    item_details = item_details[item_details["contains_sample_point"] == True]
    if len(item_details) == 0:
        return (np.nan, np.nan, np.nan)

    # Se añade la diferencia temporal como atributo al df
    item_details["time_diff"] = pd.to_datetime(date) - pd.to_datetime(
        item_details["datetime"]
    )

    return item_details.sort_values(by="time_diff", ascending=True)

In [None]:
def check_clouds(ordered_items, bounding_box):
    """
    Comprueba que el porcentaje de píxeles clear sea mayor al 25%, a partir de la lista de ítems y el área de búsqueda

    Devuelve el ítem que cumple el criterio de nubosidad y de menor diferencia temporal con la fecha de medición. Además retorna
    la imagen empleada para el cáculo de la nubosidad
    """
    (minx, miny, maxx, maxy) = bounding_box
    best_item = None

    for i in range(len(ordered_items)):
        image = odc.stac.stac_load(
            [ordered_items.iloc[i].item_obj],
            bands=["qa_pixel"],
            bbox=feature_bbox,
            resolution=30,
            crs="utm",
        ).isel(time=0)

        qa_image_array = image[["qa_pixel"]].to_array().to_numpy()

        # Como indicador de si hay nubosidad o no, se toma el bit de la posición 6 del entero que se corresponde con "Clear"
        # que vale 1 si los bits de "Clouds" y "Dilated Clouds" no están activos
        cloud_image_array = CLEAR_BIT_FILTER(qa_image_array)

        if np.size(cloud_image_array[cloud_image_array == 1]) >= 0.25 * np.size(
            cloud_image_array
        ):
            best_item = ordered_items.iloc[i]
            break

    return [best_item, qa_image_array]

## Extracción de las imágenes

In [None]:
# Establece la conexión con el STAC API
catalog = Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1", modifier=pc.sign_inplace
)

In [None]:
# Carga los ítems ya disponibles a partir de Sentinel, por lo que no es necesario descargar las imágenes Landsat
sentinel_data = json.load(open(SENTINEL_DATA_DIR / "selected_items.txt"))

In [None]:
data_subset = metadata[~metadata.uid.isin(sentinel_data.keys())]

In [None]:
selected_items = {} # mantiene un registro de los items seleccionados
not_possible = [] # mantiene un registro de aquellos ítems no disponibles
errored_ids = [] # mantiene un registro de aquellos ítems que han fallado durante la descarga

for row in tqdm(data_subset.itertuples(), total=len(data_subset)):
    
    pass
    img_pth = LANDSAT_DATA_DIR / f"cloud/{row.uid}.npy"
    
    # se comprueba que el item no exista ya en el directorio
    try:
        with open(img_pth, "rb") as f:
            continue
    except:
        try:

            # QUERY STAC API
            search_bbox = get_bounding_box(
                row.latitude, row.longitude, meter_buffer=50000
            )
            date_range = get_date_range(row.date, time_buffer_days=TIME_BUFFER_DAYS)

            # se busca en el catálogo de planetary computer
            search = catalog.search(
                collections=["landsat-c2-l2"],
                bbox=search_bbox,
                datetime=date_range,
                query={
                    "platform": {
                        "in": [
                            "landsat-8",
                            "landsat-9",
                        ]
                    }
                },
            )
            items = [item for item in search.get_all_items()]

            # Se comprueba que hayan ítems y se ordenan según la diferencia de fechas. Se descartan aquellos ítems que no contienen
            # el lugar de la medición
            if len(items) == 0:
                not_possible.append(row.uid)
                continue
            else:
                ordered_items = select_item_list(items, row.date, row.latitude, row.longitude)

            # Creación de la superficie del detalle
            feature_bbox = get_bounding_box(
                row.latitude, row.longitude, meter_buffer=METER_BUFFER
            )
            
            # Comprobación nubosidad
            [best_item, qa_image_array] = check_clouds(ordered_items, feature_bbox)
            if best_item is None:
                not_possible.append(row.uid)
                continue

            # Registro de los ítems seleccionados
            selected_items[row.uid] = {
                "item_object": str(best_item["item_obj"]),
                "item_platform": best_item["platform"],
                "item_date": best_item["datetime"],
                "cloud_properties": best_item["cloud_cover"],
                "time_diff": str(best_item["time_diff"]),
            }

            # Obtención de las imagenes recortadas
            [
                visual_image_array,
                temp_image_array,
                nir_image_array,
            ] = crop_landsat_image(best_item, feature_bbox)

            # Almacenamiento
            with open(LANDSAT_DATA_DIR / f"visual/{row.uid}.npy", "wb") as f:
                np.save(f, visual_image_array)
            with open(LANDSAT_DATA_DIR / f"cloud/{row.uid}.npy", "wb") as f:
                np.save(f, qa_image_array)
            with open(LANDSAT_DATA_DIR / f"temperature/{row.uid}.npy", "wb") as f:
                np.save(f, temp_image_array)
            with open(LANDSAT_DATA_DIR / f"nir/{row.uid}.npy", "wb") as f:
                np.save(f, nir_image_array)

        # Registro de los elementos que han dado error
        except Exception as e:
            errored_ids.append(row.uid)

In [None]:
with open(LANDSAT_DATA_DIR / "selected_items.txt", "w") as f:
    json.dump(selected_items, f)

In [None]:
print(f"No se puedo obtener imágenes para {len(errored_ids)} elementos debido a errores")

In [None]:
print(f"No se puedo obtener imágenes para {len(not_possible)} elementos")

In [None]:
selected_items7 = {} # mantiene un registro de los items seleccionados
not_possible7 = [] # mantiene un registro de aquellos ítems no disponibles
errored_ids7 = [] # mantiene un registro de aquellos ítems que han fallado durante la descarga

for row in tqdm(data_subset.itertuples(), total=len(data_subset)):
    pass
    # se comprueba que el item no exista ya en el directorio
    img_pth = LANDSAT_DATA_DIR / f"cloud/{row.uid}.npy"

    try:
        with open(img_pth, "rb") as f:
            continue
    except:
        try:

            ## QUERY STAC API
            search_bbox = get_bounding_box(
                row.latitude, row.longitude, meter_buffer=50000
            )
            date_range = get_date_range(row.date, time_buffer_days=TIME_BUFFER_DAYS)

            # se busca en el catálogo de planetary computer
            search = catalog.search(
                collections=["landsat-c2-l2"],
                bbox=search_bbox,
                datetime=date_range,
                query={
                    "platform": {
                        "in": [
                            "landsat-7"
                        ]
                    }
                },
            )
            items = [item for item in search.get_all_items()]

            # Se comprueba que hayan ítems y se ordenan según la diferencia de fechas. Se descartan aquellos ítems que no contienen
            # el lugar de la medición
            if len(items) == 0:
                not_possible7.append(row.uid)
                continue
            else:
                ordered_items = select_item_list(items, row.date, row.latitude, row.longitude)

            # Creación de la superficie del detalle
            feature_bbox = get_bounding_box(
                row.latitude, row.longitude, meter_buffer=METER_BUFFER
            )
            # comprobación nubosidad
            [best_item, qa_image_array] = check_clouds(ordered_items, feature_bbox)
            if best_item is None:
                not_possible7.append(row.uid)
                continue

            # Registro de los ítems seleccionados
            selected_items7[row.uid] = {
                "item_object": str(best_item["item_obj"]),
                "item_platform": best_item["platform"],
                "item_date": best_item["datetime"],
                "cloud_properties": best_item["cloud_cover"],
                "time_diff": str(best_item["time_diff"]),
            }

            # Obtención de las imagenes recortadas
            [
                visual_image_array,
                temp_image_array,
                nir_image_array,
            ] = crop_landsat7_image(best_item, feature_bbox)

            # Almacenamiento
            with open(LANDSAT_DATA_DIR / f"visual/{row.uid}.npy", "wb") as f:
                np.save(f, visual_image_array)
            with open(LANDSAT_DATA_DIR / f"cloud/{row.uid}.npy", "wb") as f:
                np.save(f, qa_image_array)
            with open(LANDSAT_DATA_DIR / f"temperature/{row.uid}.npy", "wb") as f:
                np.save(f, temp_image_array)
            with open(LANDSAT_DATA_DIR / f"nir/{row.uid}.npy", "wb") as f:
                np.save(f, nir_image_array)

        # Registro de los elementos que han dado error
        except Exception as e:
            errored_ids7.append(row.uid)

In [None]:
with open(LANDSAT_DATA_DIR / "selected_items7.txt", "w") as f:
    json.dump(selected_items7, f)

In [None]:
print(f"No se puedo obtener imágenes para {len(errored_ids7)} elementos debido a errores")

In [None]:
print(f"No se puedo obtener imágenes para {len(not_possible7)} elementos")

## Referencias

[1] Microsoft. Microsoft Planetary Computer.Reading Data from the STAC API_ https://planetarycomputer.microsoft.com/docs/quickstarts/reading-stac/

[2] ODC. Developer Guide odc.stac: https://odc-stac.readthedocs.io/en/latest/_api/odc.stac.load.html#odc.stac.load

[3] Driven Data. Tick Tick Bloom: Harmful Algal Bloom Detection Challenge: https://www.drivendata.org/competitions/143/tick-tick-bloom/page/650/
