In [None]:
# pip install matplotlib dotenv sentinelhub geopandas IProgress 

In [None]:
import os
import geopandas as gpd
from dotenv import load_dotenv
from sentinelhub import SHConfig, SentinelHubCatalog, BBox, CRS, DataCollection
from sentinelhub import MimeType, SentinelHubDownloadClient, SentinelHubRequest, bbox_to_dimensions, filter_times
import tqdm as notebook_tqdm
import datetime as dt
import matplotlib.pyplot as plt
import numpy as np
import tifffile

# Load environment variables
load_dotenv()

# Configure Sentinel Hub
config = SHConfig()
config.sh_client_id = os.getenv("SENTINELHUB_CLIENT_ID")
config.sh_client_secret = os.getenv("SENTINELHUB_CLIENT_SECRET")
config.sh_base_url = 'https://sh.dataspace.copernicus.eu'
config.sh_token_url = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token'

# Initialize the catalog
catalog = SentinelHubCatalog(config=config)

# Define the bounding box
# bbox = BBox(bbox=[115.24026, -8.52927, 115.28474, -8.48453], crs=CRS.WGS84)
bbox = BBox(bbox=[115.1716, -8.5968, 115.3534, -8.4170], crs=CRS.WGS84)

# Define the time range for the search
time_interval = ("2010-01-01", "2026-12-31")

# Perform a search within the bounding box and time range
search_iterator = catalog.search(
    DataCollection.SENTINEL2_L2A,
    bbox=bbox,
    time=time_interval,
    filter="eo:cloud_cover < 5",  # Optional filter for cloud cover
    fields={"include": ["id", "properties.datetime", "properties.eo:cloud_cover", "geometry"], "exclude": []},
)

# Convert the results to a list of features
features = list(search_iterator)

# Create a GeoDataFrame from the features
results_gdf = gpd.GeoDataFrame.from_features(features)

# Display the results
print("Total number of results:", len(features))
print(results_gdf)


In [None]:
# Find unique acquisitions
time_difference = dt.timedelta(hours=1)

all_timestamps = search_iterator.get_timestamps()
unique_acquisitions = filter_times(all_timestamps, time_difference)

unique_acquisitions

In [None]:
# Define the directory to save the images
save_dir = "../data/raw/sentinel2"
os.makedirs(save_dir, exist_ok=True)

# Specify the bands to download
bands_num = 13
bands_units = "DN"
sampleType = "float32"

# Define the resolution (max resolution is 10m for Sentinel-2)
# Note: Sentinel-2 has different resolutions for different bands (10m, 20m, 60m)
# Here we set the resolution to 100m for all bands, but you can adjust this as needed
# For example, if you want to download all bands at 20m resolution, set resolution = 20
# Supported resolution for B01, B02, B03, B04, B05, B06, B07, B08, B8A is 10m
# Supported resolution for B11, B12 is 20m
# Supported resolution for B09 is 60m
# Supported resolution for SCL is 20m
# SCL is the Scene Classification Layer that provides information about the scene such as clouds, water, etc.
resolution = 100

all_bands_evalscript = f"""
//VERSION=3

function setup() {{
    return {{
        input: [{{
            bands: ["B01", "B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B09", "B11", "B12", "SCL"],
            units: "{bands_units}"
        }}],
        output: {{
            bands: {bands_num},
            sampleType: "{sampleType}"
        }}
    }};
}}

function evaluatePixel(sample) {{
    return [
        sample.B01, sample.B02, sample.B03, sample.B04, sample.B05, sample.B06,
        sample.B07, sample.B08, sample.B8A, sample.B09, sample.B11, sample.B12,
        sample.SCL
    ];
}}
"""

# Store requests to download
# Note: The requests are not executed yet, they are just stored in a list
process_requests = []

# Check if image already exists for unique acquisitions and specified resolution
for timestamp in unique_acquisitions:
    # Create a unique filename for each timestamp and resolution
    filename = os.path.join(save_dir, f"image_{timestamp.isoformat()}_res{resolution}.tiff")

    # Check if the file already exists
    if not os.path.exists(filename):
        request = SentinelHubRequest(
            evalscript=all_bands_evalscript,
            input_data=[
                SentinelHubRequest.input_data(
                    data_collection=DataCollection.SENTINEL2_L2A.define_from("s2l2a", service_url=config.sh_base_url),
                    time_interval=(timestamp - time_difference, timestamp + time_difference),
                )
            ],
            responses=[SentinelHubRequest.output_response("default", MimeType.TIFF)],  # Use TIFF for multi-band data
            bbox=bbox,
            size=bbox_to_dimensions(bbox, resolution=resolution),  # Adjust resolution as needed
            config=config,
        )
        process_requests.append(request)
    else:
        print(f"File {filename} already exists. Skipping download.")

print("process_requests:", len(process_requests))

In [None]:
%%time

# Download the images
client = SentinelHubDownloadClient(config=config)

download_requests = [request.download_list[0] for request in process_requests]

# Download the images
data = client.download(download_requests)

# Example: Check dtype of each image in the downloaded data list
for i, array in enumerate(data):
    print(f"Image {i}: shape={array.shape}, dtype={array.dtype}")

data

In [None]:
# Define a basic scaling factor for visualization only
pixel_scaling = 3

# Save the downloaded images as TIFF files
for idx, (image, timestamp) in enumerate(zip(data, unique_acquisitions)):
    # Ensure the timestamp is a datetime object
    if isinstance(timestamp, dt.date):
        filename = os.path.join(save_dir, f"image_{timestamp.isoformat()}_res{resolution}.tiff")
    else:
        print(f"Invalid timestamp at index {idx}. Skipping image.")
        continue

    # ---------- Visualization Only ----------
    # Convert image to float32 for safe normalization
    image_for_display = image.astype(np.float32)

    # Percentile-based normalization to avoid cloud blow-out
    p2, p98 = np.percentile(image_for_display, (2, 98))
    image_for_display = np.clip(image_for_display, p2, p98)
    image_for_display = (image_for_display - p2) / (p98 - p2 + 1e-6)  # safe division

    # Show true color image if possible
    if image_for_display.shape[2] >= 4:
        rgb_indices = [3, 2, 1]  # B04, B03, B02 (R, G, B)
        plt.imshow(image_for_display[:, :, rgb_indices] * pixel_scaling)
        plt.title("True Color (B04, B03, B02)")
        plt.axis("off")
        plt.show()
    else:
        print(f"Warning: Image at index {idx} doesn't have 3+ bands. Skipping display.")


    # ---------- Save to TIFF ----------
    try:
        image_for_save = np.transpose(image, (2, 0, 1))  # Save raw DN values
        tifffile.imwrite(filename, image_for_save)
        print(f"Saved: {filename}")
    except Exception as e:
        print(f"Error saving image at index {idx}: {e}")

# Print shape of first image
if data:
    print(f"Shape of the first image: {data[0].shape}")
else:
    print("No data available.")


In [None]:
# Define the order of bands in your image data
band_order = ["B01", "B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B09", "B11", "B12"]

# Create a numbered dictionary
numbered_band_order = {index: band for index, band in enumerate(band_order)}

def bands(band_names, band_dict):
    """
    Function to get the indices of the bands in the data array and slice the image data accordingly.

    Parameters:
    - band_names: List of lists of band names to use (e.g., [["B04", "B03", "B02"]]).
    - band_dict: Dictionary mapping band indices to band names.

    Returns:
    - Sliced image data with the specified bands.
    """
    # Get the indices of the specified bands
    band_indices = [index for band_list in band_names for band in band_list for index, value in band_dict.items() if value == band]

    # Slice the image data to include only the specified bands
    sliced_image = data[0][:, :, band_indices]

    return sliced_image

# Example usage
# Assuming `data` contains the image data with shape (height, width, bands)
# data[0].shape should be (199, 12, 12) for the first image

# Select the RGB bands
rgb_image = bands([["B04", "B03", "B02"]], numbered_band_order)
rgb_image_normalized = np.clip(rgb_image * 2.5 / 255, 0, 1)

# Select the False Color (Vegetation) bands
vegetation_image = bands([["B08", "B04", "B03"]], numbered_band_order)
vegetation_image_normalized = np.clip(vegetation_image * 2.5 / 255, 0, 1)

# Select the False Color (Urban) bands
urban_image = bands([["B12", "B11", "B04"]], numbered_band_order)
urban_image_normalized = np.clip(urban_image * 2.5 / 255, 0, 1)

# Select the Water Detection bands
water_image = bands([["B08", "B11", "B02"]], numbered_band_order)
water_image_normalized = np.clip(water_image * 2.5 / 255, 0, 1)

# Visualize the images
fig, axes = plt.subplots(2, 2, figsize=(10, 10))

axes[0, 0].imshow(rgb_image_normalized)
axes[0, 0].set_title("RGB Image")
axes[0, 0].axis('off')

axes[0, 1].imshow(vegetation_image_normalized)
axes[0, 1].set_title("False Color (Vegetation)")
axes[0, 1].axis('off')

axes[1, 0].imshow(urban_image_normalized)
axes[1, 0].set_title("False Color (Urban)")
axes[1, 0].axis('off')

axes[1, 1].imshow(water_image_normalized)
axes[1, 1].set_title("Water Detection")
axes[1, 1].axis('off')

plt.tight_layout()
plt.show()


In [None]:
# Plotting the images
ncols, nrows = 5, 5

fig, axis = plt.subplots(
    ncols=ncols, nrows=nrows, figsize=(15, 10), subplot_kw={"xticks": [], "yticks": [], "frame_on": False}
)

for idx, (image, timestamp) in enumerate(zip(data, unique_acquisitions)):
    ax = axis[idx // ncols][idx % ncols]
    ax.imshow(np.clip(image * 2.5 / 255, 0, 1))
    ax.set_title(timestamp.date().isoformat(), fontsize=10)

plt.tight_layout()