# Calculate temporal statistical metrics from cloud-masked Sentinel 2 data on small areas (image chips) 

## Imports

In [1]:
from datetime import datetime
import geopandas as gpd
import io
import json
import numpy as np
import pandas as pd
from pathlib import Path
import pprint
import pystac
import pystac_client
from pystac_client import Client
import planetary_computer
import rasterio
from shapely.geometry import Point, Polygon
import rioxarray
import stackstac
from tqdm import tqdm
import xarray as xr

## Functions

In [2]:
def get_items(pystac_lient_url: str, datetimes: str, chip: pd.core.series.Series):

    stac = pystac_client.Client.open(pystac_lient_url)
    search = stac.search(
        intersects=dict(type="Point", coordinates=[chip.longitude, chip.latitude]),
        datetime=datetimes,
        collections=["sentinel-2-l2a"],
        limit=500,  # fetch items in batches of 500
        query={
            "eo:cloud_cover": {"lt": 80},
            # we only work with small chip areas and only use data from one zone
            # this avoids duplicates while we should not miss any data
            "proj:epsg": {"eq": int(chip.epsg)},
        },
    )
    candidate_items = list(search.get_items())
    if len(candidate_items) == 0:
        # print(f'NO ITEMS FOUND')
        return None, None
    ## scenes db for overview
    ids = []
    geometries = []
    cloud_cover = []
    pt = Point([chip.longitude, chip.latitude])

    for item in candidate_items:
        ids.append(item.id)
        # poly = Polygon.from_bounds(*item.bbox)
        assert len(item.geometry["coordinates"]) == 1
        poly = Polygon(item.geometry["coordinates"][0])
        geometries.append(poly)
        cloud_cover.append(item.properties["eo:cloud_cover"])

    scenes = pd.DataFrame(
        {"id": ids, "geometry": geometries, "cloud_cover": cloud_cover}
    )

    # https://sentinel.esa.int/web/sentinel/user-guides/sentinel-2-msi/naming-convention
    candidate_scenes = pd.concat(
        [
            scenes,
            scenes.id.str.split("_", expand=True).rename(
                {
                    0: "mission",
                    1: "product_level",
                    2: "datatake_sensing_start",
                    3: "relative_orbit",
                    4: "tile",
                    5: "product_discriminator",
                },
                axis=1,
            ),
        ],
        axis=1,
    )

    candidate_scenes["date"] = pd.to_datetime(
        candidate_scenes["datatake_sensing_start"].str[:8]
    )
    candidate_scenes["chip_id"] = chip["chip_id"]

    candidate_scenes["unique_acquisitions"] = False
    keep = candidate_scenes.groupby(["mission", "date"]).first()["id"]
    candidate_scenes.loc[scenes["id"].isin(keep), "unique_acquisitions"] = True

    scenes = candidate_scenes.query("unique_acquisitions")
    items = [it for it in candidate_items if it.id in scenes["id"].values]
    assert len(items) == scenes.shape[0]
    return items, candidate_scenes


def mask_bands_with_valid_pixel_mask(stack_bands, stack_valid_pixel_mask):
    return stack_bands.where(stack_valid_pixel_mask == 1, np.nan)


def filter_times_given_periods(datetime_list, periods):
    filtered_dates = []
    for period in periods:  # example format of period: '2018-01-01/2018-03-31'
        dt_from = np.datetime64(period.split("/")[0])
        dt_to = np.datetime64(period.split("/")[1])
        filter_dates = filter(lambda d: (d >= dt_from) & (d < dt_to + 1), datetime_list)
        filtered_dates += list(filter_dates)
    return filtered_dates


def quantiles_from_masked_band_time_stack(stack_masked_band_time_series, quantiles):
    return stack_masked_band_time_series.quantile(
        q=quantiles, dim="time"
    )  # needed if we do not do .compute().chunk(None) before calling this ufunc .chunk(dict(time=-1))

## Input data and parameters

In [3]:
# # input data
# copy of a geopandas.GeoDataFrame().to_json()
# geojson_chips = '{"type": "FeatureCollection", "features": [{"id": "2", "type": "Feature", "properties": {"chip_id": 2, "easting": 688503.1824475648, "epsg": 32632, "id": "Lerchenauer See", "latitude": 48.19723, "longitude": 11.53678, "maxx": 689460.0, "maxy": 5342280.0, "minx": 687600.0, "miny": 5340420.0, "northing": 5341333.603224352, "tiles": "32UPU", "zone_letter": "U", "zone_number": 32}, "geometry": {"type": "Polygon", "coordinates": [[[11.524234094137505, 48.18928587085897], [11.525055996690533, 48.20600379648934], [11.550065587909714, 48.205451427901956], [11.549235566161816, 48.18873382438354], [11.524234094137505, 48.18928587085897]]]}}, {"id": "3", "type": "Feature", "properties": {"chip_id": 3, "easting": 705439.5861520077, "epsg": 32632, "id": "Kieswerk Ebenhoeh", "latitude": 48.18998, "longitude": 11.76433, "maxx": 706380.0, "maxy": 5342040.0, "minx": 704520.0, "miny": 5340180.0, "northing": 5341111.3411528235, "tiles": "32UPU, 32UQU", "zone_letter": "U", "zone_number": 32}, "geometry": {"type": "Polygon", "coordinates": [[[11.751522831107586, 48.18190724189189], [11.752418453742267, 48.198622128828674], [11.777419839448665, 48.19802037502167], [11.77651610687036, 48.18130583892132], [11.751522831107586, 48.18190724189189]]]}}, {"id": "4", "type": "Feature", "properties": {"chip_id": 4, "easting": 706949.9406789518, "epsg": 32632, "id": "Marzlinger Weiher", "latitude": 48.392441, "longitude": 11.795693, "maxx": 707880.0, "maxy": 5364600.0, "minx": 706020.0, "miny": 5362740.0, "northing": 5363696.487794535, "tiles": "32UPU, 32UQU", "zone_letter": "U", "zone_number": 32}, "geometry": {"type": "Polygon", "coordinates": [[[11.782676865182088, 48.38415047868298], [11.783589070509988, 48.40086433526531], [11.808688737925511, 48.400253896920304], [11.807768333467632, 48.38354039650945], [11.782676865182088, 48.38415047868298]]]}}, {"id": "5", "type": "Feature", "properties": {"chip_id": 5, "easting": 704137.0853263629, "epsg": 32632, "id": "Tegernsee", "latitude": 47.7421, "longitude": 11.72314, "maxx": 705060.0, "maxy": 5292180.0, "minx": 703200.0, "miny": 5290320.0, "northing": 5291228.066208213, "tiles": "32UPU, 32UQU, 32TPT, 32TQT", "zone_letter": "T", "zone_number": 32}, "geometry": {"type": "Polygon", "coordinates": [[[11.710230094604286, 47.734234541181756], [11.711098555200053, 47.750951312207526], [11.735885627074119, 47.75036268068942], [11.735009250336178, 47.73364625232421], [11.710230094604286, 47.734234541181756]]]}}, {"id": "1", "type": "Feature", "properties": {"chip_id": 1, "easting": 321827.8428315255, "epsg": 32633, "id": "Raedlinger Weiher", "latitude": 48.67412, "longitude": 12.5797, "maxx": 322740.0, "maxy": 5394960.0, "minx": 320880.0, "miny": 5393100.0, "northing": 5394057.097320923, "tiles": "32UQU, 32UQV, 33UUP, 33UUQ", "zone_letter": "U", "zone_number": 33}, "geometry": {"type": "Polygon", "coordinates": [[[12.567250916755459, 48.66524646552912], [12.5664454366049, 48.68196415944526], [12.591692294447302, 48.68249501125058], [12.59248943795, 48.66577700713926], [12.567250916755459, 48.66524646552912]]]}}]}'
# chips = gpd.read_file(io.StringIO(geojson_chips))
chips = gpd.read_file("sol_chem_pnts_horizons_africa_chip_geometries.gpkg")

# # parameters
valid_scl_ids = [2, 4, 5, 6]
# must contain bands and the SCL layer
bands = ["B02", "B04", "B8A", "B09", "B10", "B11", "B12", "SCL"]
datetimes_all_seasons = "2018-01-01/2019-12-31"
# seasons with breaks, e.g.
seasons = {
    "season_1": [
        "2018-01-01/2018-03-31",
        "2018-07-01/2018-09-30",
        "2019-01-01/2019-03-31",
        "2019-07-01/2019-09-30",
    ],
    "season_2": [
        "2018-04-01/2018-06-30",
        "2018-10-01/2018-12-31",
        "2019-04-01/2019-06-30",
        "2019-10-01/2019-12-31",
    ],
}
quantiles = [0.1, 0.25, 0.5, 0.75, 0.9]
add_mid50 = True
add_mid80 = True

## Cluster

https://planetarycomputer.microsoft.com/docs/quickstarts/scale-with-dask/


In [4]:
import dask_gateway

cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.scale(5)
print(cluster.dashboard_link)

https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.6aec14f833d541d08fd8b62b8a8b115b/status


In [5]:
# cluster.close()

## Target Storage

### Azure Blob

Required for methods
* `Chip.upload_quartiles_as_netcdf_to_blob`
* `Chip.upload_scenes_csv_to_blob`

```python
import getpass
import azure.storage.blob
import io

connection_string = getpass.getpass()  # prompts for the connection string
container_client = azure.storage.blob.ContainerClient.from_connection_string(
    connection_string, container_name="test"
)

# to list and delete all files ...
blob_names = [b.name for b in container_client.list_blobs()]
# container_client.delete_blobs(*blob_names)
```

### AWS S3


Required for methods
* `Chip.upload_quartiles_as_netcdf_to_bucket`
* `Chip.upload_scenes_csv_to_bucket`

In [6]:
import boto3
import getpass

access_key = getpass.getpass()
secret_key = getpass.getpass()


# Create connection to S3
s3 = boto3.resource(
    "s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key
)

# # Get bucket object
bucket_name = "mi4people-soil-project"
basedir_chips = "chips/s2_metrics_p/"
boto_bucket = s3.Bucket(bucket_name)

 ········
 ········


In [7]:
# to get what is already there

In [8]:
def list_s3_files_using_paginator(bucket_name, prefix, access_key, secret_key):
    """
    This functions list all files in s3 using paginator.
    Paginator is useful when you have 1000s of files in S3.
    S3 list_objects_v2 can list at max 1000 files in one go.
    :return: None
    """
    s3_client = boto3.client(
        "s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key
    )
    paginator = s3_client.get_paginator("list_objects_v2")
    response_iterator = paginator.paginate(
        Bucket=bucket_name,
        Prefix=prefix,
    )
    all_files = []
    for page in response_iterator:
        files = page.get("Contents")
        for file in files:
            all_files.append(file["Key"])
    return all_files

## Process all chips

In [9]:
# %%timeit
class ChipPercentiles:
    def __init__(
        self,
        chip,
        bands=["B02", "B04", "B8A", "B09", "B10", "B11", "B12", "SCL"],
        seasons={
            "season_1": [
                "2018-01-01/2018-03-31",
                "2018-07-01/2018-09-30",
                "2019-01-01/2019-03-31",
                "2019-07-01/2019-09-30",
            ],
            "season_2": [
                "2018-04-01/2018-06-30",
                "2018-10-01/2018-12-31",
                "2019-04-01/2019-06-30",
                "2019-10-01/2019-12-31",
            ],
        },
        quantiles=[0.1, 0.25, 0.5, 0.75, 0.9],
        add_mid50=True,
        add_mid80=True,
        datetimes_all_seasons="2018-01-01/2019-12-31",
        valid_scl_ids=[2, 4, 5, 6],
    ):

        # used in query
        self.chip = chip
        self.bands = bands
        self.datetimes_all_seasons = datetimes_all_seasons

        # used for cloud masking bands based on SCL
        self.valid_scl_ids = valid_scl_ids

        # used to calculate the percentiles
        self.seasons = seasons
        self.quantiles = quantiles
        self.add_mid50 = add_mid50
        self.add_mid80 = add_mid80

        #

        # from get_expected_files
        self.expected_files = None

        # from get_items
        self.items = None
        self.candidate_scenes = None

        # from crate_stack_bands_masked
        self.stack_bands_masked = None

        # from create_quartiles
        self.ds_quartiles_all_seasons = None

    def get_filename_single_layer_tiff(self, season_name, band, quantile):
        try:
            q_name = f"{int(quantile*100):03d}"
        except:
            q_name = quantile
        filename = f"chip-{self.chip.chip_id:05d}_s2_season-{season_name.lower()}_band-{band.lower()}_q-{q_name}.tif"
        return filename

    def get_items(self):
        # get all deduplicated items matching the query parameters
        self.items, self.candidate_scenes = get_items(
            pystac_lient_url="https://planetarycomputer.microsoft.com/api/stac/v1",
            datetimes=self.datetimes_all_seasons,
            chip=self.chip,
        )

    def crate_stack_bands_masked(self):

        # from the items create a xarray DataArray with dims ('time', 'band', 'y', 'x')
        stack = stackstac.stack(
            planetary_computer.sign(pystac.ItemCollection(self.items)),
            assets=self.bands,
            bounds=[self.chip.minx, self.chip.miny, self.chip.maxx, self.chip.maxy],
        )
        # stack containing only the scene classification layer of all scenes
        stack_scl = stack.drop_sel(band="SCL")
        # stack with a mask where all valid pixes are Treu and invalid pixes are False
        stack_valid_pixel_mask = stack.sel(band="SCL").isin(self.valid_scl_ids)
        # stack with only the B* bands
        stack_bands = stack.drop_sel(band="SCL")
        # stack with only the B* bands masked, i.d. with nan where the pixels are invalid

        # we drop ['gsd', 'common_name'] to avoid the ValueError: Variables {'gsd', 'common_name'} are coordinates in some datasets but not others.
        self.stack_bands_masked = (
            (
                stack_bands.drop_vars(["gsd", "common_name"])
                .groupby("band")
                .map(
                    mask_bands_with_valid_pixel_mask,
                    stack_valid_pixel_mask=stack_valid_pixel_mask,
                )
            )
            .compute()
            .chunk({})
        )

    def create_quartiles(self):

        ds_quartiles_all_seasons = []
        for season_name, season_periods in self.seasons.items():

            # time subset the stack for the given season
            # get the times in the stack that fall in any of the periods of the given season
            season_times = filter_times_given_periods(
                self.stack_bands_masked.time.values, season_periods
            )
            # print(f'Time filter for season {season_name} returned {len(season_times)} of {len(stack_bands_masked.time.values)}.')
            # subset the stack
            stack_bands_masked_season = self.stack_bands_masked[
                self.stack_bands_masked.time.isin(season_times)
            ]

            # calculate quantiles
            stack_bands_masked_season_quantiles = stack_bands_masked_season.groupby(
                "band"
            ).map(quantiles_from_masked_band_time_stack, quantiles=self.quantiles)

            # add dispersion measures
            # The IQR may also be called the midspread, middle 50%, fourth spread, or H‑spread.
            # https://en.wikipedia.org/wiki/Interquartile_range
            if self.add_mid50:
                mid50 = stack_bands_masked_season_quantiles.sel(
                    quantile=0.75
                ) - stack_bands_masked_season_quantiles.sel(quantile=0.25)
                stack_bands_masked_season_quantiles = xr.concat(
                    [
                        stack_bands_masked_season_quantiles,
                        mid50.expand_dims(quantile=["mid50"]),
                    ],
                    dim="quantile",
                )
            if self.add_mid80:
                mid80 = stack_bands_masked_season_quantiles.sel(
                    quantile=0.9
                ) - stack_bands_masked_season_quantiles.sel(quantile=0.1)
                stack_bands_masked_season_quantiles = xr.concat(
                    [
                        stack_bands_masked_season_quantiles,
                        mid80.expand_dims(quantile=["mid80"]),
                    ],
                    dim="quantile",
                )

            # add crs and convert to desired output data type
            stack_bands_masked_season_quantiles = (
                stack_bands_masked_season_quantiles.rio.write_crs(
                    rioxarray.crs.CRS.from_epsg(self.chip["epsg"]).to_string(),
                    inplace=False,
                )
                .astype("uint16")
                .to_dataset("band")
                .expand_dims({"time_period": [season_name]})
            )
            ds_quartiles_all_seasons.append(stack_bands_masked_season_quantiles)
        ds_quartiles_all_seasons = xr.concat(
            ds_quartiles_all_seasons, dim="time_period"
        )
        # we need to make sure that the whole
        new_coords = [
            str(f"p{int(q*100):03d}") if isinstance(q, float) else f"p{q}"
            for q in ds_quartiles_all_seasons.coords.get("quantile").values
        ]
        ds_quartiles_all_seasons = ds_quartiles_all_seasons.assign_coords(
            quantile=new_coords
        ).rename({"quantile": "metric"})
        self.ds_quartiles_all_seasons = ds_quartiles_all_seasons

    def load_quartiles_in_memory(self):
        # we need to load result in memory else we get the following during to_netcdf
        # ValueError: invalid engine for creating bytes with to_netcdf: 'netcdf4'. Only the default engine or engine='scipy' is supported
        self.ds_quartiles_all_seasons_in_memory = (
            self.ds_quartiles_all_seasons.compute()
        )

    def upload_quartiles_as_netcdf_to_blob(self, container_client):
        filename = self.get_expected_filename_single_netcdf_file()
        with io.BytesIO() as buffer:
            buffer.write(self.ds_quartiles_all_seasons_in_memory.to_netcdf())
            buffer.seek(0)
            blob_client = container_client.get_blob_client(filename)
            blob_client.upload_blob(buffer, overwrite=True)

    def upload_scenes_csv_to_blob(self, container_client, filename):
        # filename = self.get_expected_filename_scenes_csv_file()
        with io.BytesIO() as buffer:
            self.candidate_scenes.to_csv(buffer)
            buffer.seek(0)
            blob_client = container_client.get_blob_client(filename)
            blob_client.upload_blob(buffer, overwrite=True)

    def upload_quartiles_as_netcdf_to_bucket(self, boto_bucket, filename):
        # filename = self.get_expected_filename_single_netcdf_file()
        with io.BytesIO() as buffer:
            buffer.write(self.ds_quartiles_all_seasons_in_memory.to_netcdf())
            buffer.seek(0)
            boto_bucket.upload_fileobj(buffer, filename)

    def upload_scenes_csv_to_bucket(self, boto_bucket, filename):
        # filename = self.get_expected_filename_scenes_csv_file()
        with io.BytesIO() as buffer:
            self.candidate_scenes.to_csv(buffer)
            buffer.seek(0)
            boto_bucket.upload_fileobj(buffer, filename)

    def write_quartiles_as_netcdf_locally(self, filename):
        # filename = dst_dir + self.get_expected_filename_single_netcdf_file()
        self.ds_quartiles_all_seasons_in_memory.to_netcdf(filename)

    def write_scenes_csv_locally(self, filename):
        # filename = dst_dir + self.get_expected_filename_scenes_csv_file()
        self.candidate_scenes.to_csv(filename)

## Try it out

In [10]:
# # chip_s = chips_to_be_processed.iloc[2]
# # chip_s = chips.iloc[0]

# chip = ChipPercentiles(
#     chip_s,
#     bands=bands,
#     seasons=seasons,
#     quantiles=quantiles,
#     add_mid50=True,
#     add_mid80=True,
#     datetimes_all_seasons=datetimes_all_seasons,
#     valid_scl_ids=valid_scl_ids,
# )

# chip.get_items()
# chip.crate_stack_bands_masked()
# chip.create_quartiles()
# chip.load_quartiles_in_memory()

# # save
# olc_id = chip.chip.olc_id

# # str(Path(...) ...) is safer
# # if we mess up the separaters (zero or two / instead of one)
# # aws will take it serious and create kindo of a nameless directory
# filename_nc = str(Path(basedir_chips) / 'data' / f"{olc_id}.nc")
# filename_s2_scenes = str((Path(basedir_chips) / 'meatadata_s2_scenes' / f"{olc_id}.csv"))

In [11]:
# chip.upload_quartiles_as_netcdf_to_bucket(boto_bucket=boto_bucket, filename=filename_nc)
# chip.upload_scenes_csv_to_bucket(boto_bucket=boto_bucket, filename=filename_s2_scenes)

## Run for all chips

### Get locations that needs to be processed

In [12]:
bucket_name = "mi4people-soil-project"
basedir_chips = "chips/s2_metrics_p"

existing_files_in_dir = list_s3_files_using_paginator(
    bucket_name, basedir_chips, access_key, secret_key
)

df_computed = pd.DataFrame(existing_files_in_dir, columns=["file"])
# remove directories
df_computed = df_computed.query('~file.str.endswith("/")')
df_computed["olc_id"] = df_computed["file"].str.extract(r".*/(.*)\.\w{2,3}")
df_computed
# get all complete (csv and nc) chips
df_computed[["olc_id", "suffix"]] = df_computed["file"].str.extract(
    r".*/(.*)\.(\w{2,3})"
)
assert df_computed["suffix"].isin(["csv", "nc"]).all()
olc_ids_computed_bool = df_computed["olc_id"].value_counts() == 2
olc_ids_computed = olc_ids_computed_bool[olc_ids_computed_bool].index.values

chips_to_be_processed = chips[~chips["olc_id"].isin(olc_ids_computed)]
print(chips.shape)
print(chips_to_be_processed.shape)

# 2022-10-24
# (29927, 15)
# (27994, 15)

# (29927, 15)
# (24141, 15)

# (29927, 15)
# (23579, 15)

# (29927, 15)
# (19805, 15)

# (29927, 15)
# (16249, 15)

# (29927, 15)
# (12757, 15)

# (29927, 15)
# (9478, 15)

# (29927, 15)
# (6078, 15)

# (29927, 15)
# (2525, 15)

# (29927, 15)
# (1069, 15)

(29927, 15)
(1069, 15)


### Remove locations where we did not get any scenes for

In [13]:
chips_no_data = []
with open("s2-chips-generation.log") as logfile:
    for line in logfile.readlines():
        if "No scenes found" in line:
            chips_no_data.append(line.split(" ")[-1].replace("\n", ""))

print(chips_no_data[:5])
len(chips_no_data)

['7CGCM2C4+9R7', '7C4PC228+2M3', '7C4P95MX+8XQ', '7C4P86MM+88P', '7C4PF528+2M3']


1069

In [14]:
print(chips_to_be_processed.shape)
print(chips_to_be_processed.query(f"~olc_id.isin({chips_no_data})").shape)
chips_to_be_processed = chips_to_be_processed.query(f"~olc_id.isin({chips_no_data})")

(1069, 15)
(0, 15)


### Run

In [15]:
import logging

# Create a custom logger

logger = logging.getLogger()
while logger.hasHandlers():
    logger.removeHandler(logger.handlers[0])

# Create handlers
f_handler = logging.FileHandler("s2-chips-generation.log")
f_handler.setLevel(logging.DEBUG)

# Create formatters and add it to handlers
f_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
f_handler.setFormatter(f_format)

# Add handlers to the logger
logger.addHandler(f_handler)

# try:
#     raise RuntimeError("Error opening 'https://sentinel2l2a01.blob.core.windows.net/sentinel2-l2/29/P/KQ/2018/01/02/S2B_MSIL2A_20180102T111439_N0212_R137_T29PKQ_20201014T025243.SAFE/GRANULE/L2A_T29PKQ_A004310_20180102T111901/IMG_DATA/R10m/T29PKQ_20180102T111439_B02_10m.tif?st=2022-10-02T06%3A19%3A11Z&se=2022-10-10T06%3A19%3A11Z&sp=rl&sv=2021-06-08&sr=c&skoid=c85c15d6-d1ae-42d4-af60-e2ca0f81359b&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2022-10-03T06%3A19%3A10Z&ske=2022-10-10T06%3A19%3A10Z&sks=b&skv=2021-06-08&sig=cPPukdFO//xFki6IB8wwIza6SpYCwqsc5uHnkrgm8rA%3D': RasterioIOError('HTTP response code: 403")
# except RuntimeError as ex:
#     url = ex.args[0].split("'")[1]
#     se = datetime.datetime.strptime(urllib.parse.parse_qs(url)["se"][0], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
#     logger.error(f'TEST !!! UTC time of RuntimeError / SE | {datetime.datetime.utcnow()} / {se}', exc_info=True)

In [16]:
import urllib
import datetime

# overwrite = True
# blob_names = [b.name for b in container_client.list_blobs()]

scenes_not_found = []
for chip_index, chip_s in tqdm(
    chips_to_be_processed.iterrows(), total=chips_to_be_processed.shape[0]
):
    try:
        chip = ChipPercentiles(
            chip_s,
            bands=bands,
            seasons=seasons,
            quantiles=quantiles,
            add_mid50=True,
            add_mid80=True,
            datetimes_all_seasons=datetimes_all_seasons,
            valid_scl_ids=valid_scl_ids,
        )
        olc_id = chip.chip.olc_id

        chip.get_items()
        if chip.candidate_scenes is None:
            # print(f'No scenes found for {chip.chip.olc_id}')
            scenes_not_found.append(olc_id)
            logger.warning(f"No scenes found for {chip.chip.olc_id}")
            continue
        chip.crate_stack_bands_masked()
        chip.create_quartiles()
        chip.load_quartiles_in_memory()

        # str(Path(...) ...) is safer
        # if we mess up the separaters (zero or two / instead of one)
        # aws will take it serious and create kindo of a nameless directory
        filename_nc = str(Path(basedir_chips) / "data" / f"{olc_id}.nc")
        filename_s2_scenes = str(
            (Path(basedir_chips) / "meatadata_s2_scenes" / f"{olc_id}.csv")
        )

        chip.upload_quartiles_as_netcdf_to_bucket(
            boto_bucket=boto_bucket, filename=filename_nc
        )
        chip.upload_scenes_csv_to_bucket(
            boto_bucket=boto_bucket, filename=filename_s2_scenes
        )
        logger.info(f"Chip processed {chip.chip.olc_id}")

    except RuntimeError:
        url = ex.args[0].split("'")[1]
        se = datetime.datetime.strptime(
            urllib.parse.parse_qs(url)["se"][0], "%Y-%m-%dT%H:%M:%SZ"
        ).replace(tzinfo=datetime.timezone.utc)
        logger.error(
            f"UTC time of RuntimeError / SE | {datetime.datetime.utcnow()} / {se}",
            exc_info=True,
        )

    except Exception as ex:
        logger.error(f"Exception ocurred", exc_info=True)

#   0%|          | 5/28051 [02:21<220:30:28, 28.30s/it]
# 2022-10-23 14:43:39,675 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client

0it [00:00, ?it/s]
