# Sentinel-2 Image Visualization

Goal of the lecture:
1. Programmatically download a tile from the open and free [scihub.copernicus.eu](https://scihub.copernicus.eu/dhus/#/home) page
1. Extract indivdual bands and visualize them
1. Create an RGB image of three bands; one true-color and one false-color composite
1. Select a sub-region of the tile by defining a region of interest
1. Visualize the spectral signature of different land-use/land-cover classes

## Downloading the tile

Take a look at [scihub.copernicus.eu](https://scihub.copernicus.eu/dhus/#/home) to understand what we are tryig to accomplish with code!

To download a specific satellite tile, we have to:
1. Sign up for the service
1. Select a region of interest
1. Query Copernicus to select tiles from target satellite with relevant parameters



### Import Python libraries

If the libraries aren't available on paperspace, make the `ip4rs` IPython Kernel is selected in the top-right corner!

If it is not, change the kernel!

![](https://doc.cocalc.com/_images/jupyter-classic-change-kernel.png)

#### Notes for local installation
If you are on your local machine and the Python kernel suddenly dies, it probably means that you have installed incompatible binaries, please try install the dependencies by following the guide from [ip4rs-dependencies](https://github.com/kai-tub/ip4rs-dependencies).

In [14]:
import numpy as np
import geopandas
from pathlib import Path
import osmnx


from sentinelsat import SentinelAPI
from datetime import date
from shapely.geometry import Point
import rasterio
import zipfile

from enum import Enum
from typing import List, Sequence
import matplotlib.pyplot as plt

import rasterio.mask

### Set up credentials

In [15]:
# config set via DynaConf
# mainly for us to keep our secrets secret ;)
from dynaconf import Dynaconf

settings = Dynaconf(
    envvar_prefix="IP4RS",
    settings_files=["settings.toml", ".secrets.toml"],
)
###

SENTINEL_API_URL = "https://scihub.copernicus.eu/dhus"
YOUR_USERNAME = ""
YOUR_PWD = ""
user = settings.get("scihub_copernicus_user", default=YOUR_USERNAME)
pwd = settings.get("scihub_copernicus_pwd", default=YOUR_PWD)

assert user != ""
assert pwd != ""

Let's select a point of interest (POI).
For this lab we will use the TU Berlin building.

The following code requires the Latitude and Longitude coordinates of the POI.
For now, we can simply rely on a service like [LatLong.net](latlong.net) to look up the coordinates:

- [TU Berlin coordinates via LatLong.net](https://www.latlong.net/place/technical-university-of-berlin-germany-248.html)

In [16]:
latitude = 52.512230
longitude = 13.327135

In [17]:
# geopandas.GeoSeries?

In [18]:
series = geopandas.GeoSeries([Point(longitude, latitude)], crs="EPSG:4326")
series

0    POINT (13.32714 52.51223)
dtype: geometry

In [19]:
# GeoPandas allows us to very quickly visually inspect our geographical data
series.explore(marker_type="marker")

In [7]:
api_sent = SentinelAPI(user, pwd, SENTINEL_API_URL)
# api_sent.query?

To query the copernicus database, we need to encode our geometry data into a common format that the service can understand.
Looking at the `SentinelAPI` documentation, we can see that the API expect the data to be encoded as a WKT string.

WKT stands for well-known text and is a simple text-based format to describe geometries. You can think of it as an [JSON](https://de.wikipedia.org/wiki/JavaScript_Object_Notation) alternative.

In [8]:
series.to_wkt()

0    POINT (13.327135 52.51223)
dtype: object

In [9]:
poi = series.to_wkt()[0]
poi

'POINT (13.327135 52.51223)'

In [10]:
start_date = date(year=2022, month=3, day=1)
end_date = date(year=2022, month=4, day=30)
satellite = "Sentinel-2"
producttype = "S2MSI2A"

products = api_sent.query(
    poi,
    date=(start_date, end_date),
    platformname=satellite,
    producttype=producttype,
    cloudcoverpercentage=(0, 10),
)

# convert to GeoPandas GeoDataFrame
products_gdf = api_sent.to_geodataframe(products)
assert not products_gdf.empty
products_gdf.head(1)
# If you get an internal-service error, simply try again

Unnamed: 0,title,link,link_alternative,link_icon,summary,ondemand,generationdate,beginposition,endposition,ingestiondate,...,producttype,platformidentifier,orbitdirection,platformserialidentifier,processinglevel,datastripidentifier,granuleidentifier,identifier,uuid,geometry
58902f9e-3457-4d73-ad99-2989059941ab,S2B_MSIL2A_20220324T100649_N0400_R022_T33UUU_2...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,"Date: 2022-03-24T10:06:49.024Z, Instrument: MS...",False,2022-03-24 13:23:05,2022-03-24 10:06:49.024,2022-03-24 10:06:49.024,2022-03-24 16:46:42.329,...,S2MSI2A,2017-013A,DESCENDING,Sentinel-2B,Level-2A,S2B_OPER_MSI_L2A_DS_VGS2_20220324T132305_S2022...,S2B_OPER_MSI_L2A_TL_VGS2_20220324T132305_A0263...,S2B_MSIL2A_20220324T100649_N0400_R022_T33UUU_2...,58902f9e-3457-4d73-ad99-2989059941ab,"MULTIPOLYGON (((12.35312 52.23128, 13.67854 52..."


In [11]:
# sort and select the first row
product = products_gdf.sort_values(
    ["cloudcoverpercentage", "ingestiondate"], ascending=[True, True]
).head(1)
product

Unnamed: 0,title,link,link_alternative,link_icon,summary,ondemand,generationdate,beginposition,endposition,ingestiondate,...,producttype,platformidentifier,orbitdirection,platformserialidentifier,processinglevel,datastripidentifier,granuleidentifier,identifier,uuid,geometry
6e5ad1c9-fc0e-4b93-9b5c-d0d401673206,S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_2...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,"Date: 2022-03-09T10:08:41.024Z, Instrument: MS...",False,2022-03-09 12:18:49,2022-03-09 10:08:41.024,2022-03-09 10:08:41.024,2022-03-09 17:48:53.772,...,S2MSI2A,2015-028A,DESCENDING,Sentinel-2A,Level-2A,S2A_OPER_MSI_L2A_DS_VGS4_20220309T121849_S2022...,S2A_OPER_MSI_L2A_TL_VGS4_20220309T121849_A0350...,S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_2...,6e5ad1c9-fc0e-4b93-9b5c-d0d401673206,"MULTIPOLYGON (((13.53103 52.17548, 13.63418 53..."


In [12]:
# ensuring that the tile is the one we are also providing via a direct
# download link
assert product["title"][0] == "S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_20220309T121849"

In [13]:
# Would fail, because there are entries that cannot be JSON encoded!
# product.explore()
# Only visualize the relevant columns and convert them if necessary
product[["title", "summary", "geometry"]].explore()

### Download the tile

In [29]:
product

Unnamed: 0,title,link,link_alternative,link_icon,summary,ondemand,generationdate,beginposition,endposition,ingestiondate,...,producttype,platformidentifier,orbitdirection,platformserialidentifier,processinglevel,datastripidentifier,granuleidentifier,identifier,uuid,geometry
6e5ad1c9-fc0e-4b93-9b5c-d0d401673206,S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_2...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,https://scihub.copernicus.eu/dhus/odata/v1/Pro...,"Date: 2022-03-09T10:08:41.024Z, Instrument: MS...",False,2022-03-09 12:18:49,2022-03-09 10:08:41.024,2022-03-09 10:08:41.024,2022-03-09 17:48:53.772,...,S2MSI2A,2015-028A,DESCENDING,Sentinel-2A,Level-2A,S2A_OPER_MSI_L2A_DS_VGS4_20220309T121849_S2022...,S2A_OPER_MSI_L2A_TL_VGS4_20220309T121849_A0350...,S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_2...,6e5ad1c9-fc0e-4b93-9b5c-d0d401673206,"MULTIPOLYGON (((13.53103 52.17548, 13.63418 53..."


In [20]:
download_dir = Path("./data")
download_dir.mkdir(exist_ok=True)

# ~800MB large!
# May take a while to download
# The download is very slow from their server

# output_file = download_dir / f"{product.title[0]}.zip"
# if not output_file.exists():
#     api_sent.download_all(product.index, directory_path=download_dir)

In [22]:
# local convenience function
# from downloader import download
from simple_downloader import download

TUB_URL = "https://tubcloud.tu-berlin.de/s/QSi8fHG9ffG5FKE/download/S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_20220309T121849.zip"
output_file = download(TUB_URL, "./data/")

Downloading S2A_MSIL2A_20220309T100841_N0400_R022_T32UQD_20220309T121849.zip:   0%|          | 0/817626551 [00…

### Extract zipfile

In [None]:
zipf = zipfile.ZipFile(output_file)
zipf.extractall(path="data")

The data inside of the Zip-file is packaged into a "root" directory with the name of the original zip file + the `.SAFE` extension.
The data we are interested in lives inside a subdirectory called `IMG_DATA`.
The image data is encoded as a [jpeg2000](https://de.wikipedia.org/wiki/JPEG_2000) file with the extension `jp2`.

We can combine all of the information to build a [glob](https://en.wikipedia.org/wiki/Glob_(programming)) expression to quickly build a list of the files we are interested in.

Note: The directory structure has been changed very recently, depending on the selected dates, the relevant images live either directly under the `IMG_DATA` directory, or they live under a sub-directory `IMG_DATA/R{10,20,60}m/`.

In [None]:
unzipped_dir = output_file.with_suffix(".SAFE")
assert unzipped_dir.exists()

## Working with the tile data
### Accessing the data array

In [None]:
class Band(str, Enum):
    """A collection of different Sentinel-2 band names."""

    B01 = "B01"
    B02 = "B02"
    B03 = "B03"
    B04 = "B04"
    B05 = "B05"
    B06 = "B06"
    B07 = "B07"
    B08 = "B08"
    B8A = "B8A"
    B09 = "B09"
    B10 = "B10"
    B11 = "B11"
    B12 = "B12"

    def __str__(self):
        return self.value


def read_s2_jp2_data(jp2_data_path: Path) -> np.ndarray:
    """
    Read band from Sentinel-2 jp2 file.
    """
    with rasterio.open(jp2_data_path) as data:
        # rasterio is 1-indexed
        return data.read(1)


def _get_all_jp2_files(source_dir: Path, parent_dir: str = "IMG_DATA/R60m") -> List[Path]:
    """
    Given a Sentinel-2 source directory, find all jp2 files that have
    a parent folder named `parent_dir`.
    Usually, it should be the folder `IMG_DATA`, other possible source
    would be the quality masks in `QI_DATA`.
    To not load band multiple times at different resolutions, by default
    the lowest 60m band is loaded.

    Note: Depending on the acquisition date and data type, the structure might be different
    and no sub-directory within `IMG_DATA` exists!
    """
    image_files = list(source_dir.glob(f"**/{parent_dir}/*.jp2"))
    assert len(image_files) > 0
    return image_files


class S2_TileReader:
    def __init__(self, safe_directory: Path, img_data_parent_dir: str = "IMG_DATA/R60m"):
        self.image_files = _get_all_jp2_files(safe_directory, parent_dir=img_data_parent_dir)

    def _get_band_path(self, band: Band) -> Path:
        return [f for f in self.image_files if f"_{band}_" in f.name][0]

    def read_band_data(self, band: Band) -> np.ndarray:
        band_path = self._get_band_path(band)
        return read_s2_jp2_data(band_path)


s2_reader = S2_TileReader(unzipped_dir)
band03_data = s2_reader.read_band_data(Band.B03)

### Visualizing tiles
#### Visualizing individual channels

Sentinel-2 satellite data is encoded in an uncommon format `uint16` and requires special care, when trying to visualize.
The minimum and maximum values are given by physical properties of the sensor and are not normalize to fill the entire data range.

In [None]:
band03_data.min(), band03_data.max(), band03_data.shape, band03_data.dtype

In [None]:
plt.figure(figsize=(15, 15))
plt.imshow(band03_data, cmap="gray")

In [None]:
def quant_norm_data(
    data: np.ndarray, lower_quant: float = 0.01, upper_quant: float = 0.99
) -> np.ndarray:
    """
    Normalize the data by quantiles `lower_quant/upper_quant`.
    The quantiles are calculated globally/*across all channels*.
    """
    masked_data = np.ma.masked_equal(data, 0)
    lq, uq = np.quantile(masked_data.compressed(), (lower_quant, upper_quant))
    data = np.clip(data, a_min=lq, a_max=uq)
    data = (data - lq) / (uq - lq)
    return data


def vis(data: np.ndarray, quant_norm: bool = True):
    """
    Visualize an array by calling `imshow` with `cmap="gray"`.
    By default, the image is normalized through `quant_norm_data`.
    """
    if quant_norm:
        data = quant_norm_data(data)

    plt.figure(figsize=(15, 15))
    plt.axis("off")
    plt.imshow(data, cmap="gray")


vis(band03_data)

In [None]:
# vis(band03_data)
sub_band03_data = band03_data[-600:, -600:]
vis(sub_band03_data)

#### Visualizing multiple bands

In [None]:
rgb_arr = np.stack(
    [s2_reader.read_band_data(b) for b in (Band.B04, Band.B03, Band.B02)],
    axis=-1,
)
rgb_arr.shape

In [None]:
vis(rgb_arr)

In [None]:
sub_rgb_arr = rgb_arr[-600:, -600:]
vis(sub_rgb_arr)

In [None]:
# store each channel individually with a key to
# make it easier to work with it later
np.savez(
    "data/sub_rgb_arr",
    red=sub_rgb_arr[:, :, 0],
    green=sub_rgb_arr[:, :, 1],
    blue=sub_rgb_arr[:, :, 2],
)
np.savez("data/rgb_arr", red=rgb_arr[:, :, 0], green=rgb_arr[:, :, 1], blue=rgb_arr[:, :, 2])

#### Exercise

Create a False Color Composite by visualizing the bands `B8A`, `B04`, `B03`.

### Selecting regions of tiles

Use the `osmnx` library to retrieve the area of interest by looking up a name-identifier.

In [None]:
berlin_gdf = osmnx.geocode_to_gdf("Berlin")
berlin_gdf

In [None]:
berlin_gdf.explore()

In [None]:
def read_s2_jp2_data_with_clipping(
    band_data_path: Path, clip_geoseries: geopandas.GeoSeries, envelope: bool = True
) -> np.ndarray:
    with rasterio.open(band_data_path) as data:
        # ensure that the data is using the same coordinate reference system
        reprojected_geoseries = clip_geoseries.to_crs(data.crs)
        reprojected_geoseries = (
            reprojected_geoseries.envelope if envelope else reprojected_geoseries
        )
        out_img, _out_transform = rasterio.mask.mask(data, reprojected_geoseries, crop=True)
        # drop singleton axes
        out_img = out_img.squeeze()
    return out_img

In [None]:
class S2_TileReader:
    def __init__(self, safe_directory: Path, img_data_parent_dir: str = "IMG_DATA/R60m"):
        self.image_files = _get_all_jp2_files(safe_directory, parent_dir=img_data_parent_dir)

    def _get_band_path(self, band: Band) -> Path:
        return [f for f in self.image_files if f"_{band}_" in f.name][0]

    def read_band_data(self, band: Band) -> np.ndarray:
        band_path = self._get_band_path(band)
        return read_s2_jp2_data(band_path)

    def read_band_data_with_clipping(
        self, band: Band, clip_geoseries: geopandas.GeoSeries, envelope: bool = True
    ) -> np.ndarray:
        band_path = self._get_band_path(band)
        return read_s2_jp2_data_with_clipping(band_path, clip_geoseries, envelope=envelope)


# re-initialize
s2_reader = S2_TileReader(unzipped_dir)
clipped_band03_data = s2_reader.read_band_data_with_clipping(
    Band.B03, berlin_gdf.geometry, envelope=True
)

In [None]:
# berlin_gdf.envelope.explore()

In [None]:
vis(clipped_band03_data)

In [None]:
clipped_rgb_arr = np.stack(
    [s2_reader.read_band_data_with_clipping(b, berlin_gdf.geometry) for b in ("B04", "B03", "B02")],
    axis=-1,
)

In [None]:
vis(clipped_rgb_arr)

In [None]:
vis(clipped_rgb_arr)

## Inspecting spectral signature

In [None]:
# LCLU: Land-Cover Land-Use
lclu_gdf = geopandas.GeoDataFrame(
    {"type": ["water", "airport", "forest"]},
    geometry=[
        Point(13.175955, 52.456009),
        Point(13.508517, 52.380236),
        Point(13.212926, 52.478834),
    ],
    crs="epsg:4326",
)
lclu_gdf.reset_index().explore(marker_type="marker")

In [None]:
# depends on product type and used image directory!
AVAILABLE_BANDS = (
    Band.B01,
    Band.B02,
    Band.B03,
    Band.B04,
    Band.B05,
    Band.B06,
    Band.B07,
    Band.B8A,
    Band.B09,
    Band.B11,
    Band.B12,
)


def read_points_from_tile(
    s2_reader: S2_TileReader,
    points_series: geopandas.GeoSeries,
    bands: Sequence[Band] = AVAILABLE_BANDS,
) -> np.ndarray:
    if set(lclu_gdf.geom_type) != {"Point"}:
        raise ValueError("Only point geometries are allowed!")

    return np.array([s2_reader.read_band_data_with_clipping(b, points_series) for b in bands])

In [None]:
lclu_gdf

In [None]:
water_spectral_sig = read_points_from_tile(s2_reader, lclu_gdf.query("type == 'water'"))
airport_spectral_sig = read_points_from_tile(
    s2_reader, lclu_gdf.query("type == 'airport'").geometry
)
forest_spectral_sig = read_points_from_tile(s2_reader, lclu_gdf.query("type == 'forest'").geometry)

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(AVAILABLE_BANDS, forest_spectral_sig, "o-", label="forest", linewidth=2)
plt.plot(AVAILABLE_BANDS, airport_spectral_sig, "o-", label="airport", linewidth=2)
plt.plot(AVAILABLE_BANDS, water_spectral_sig, "o-", label="water", linewidth=2)

plt.legend(fontsize=14)
plt.grid()

### In-course Practice:

Select 3 different point (pixels) and plot their spectral signitures