# Acquiring and pre-processing Sentinel 2 imagery samples using Google Earth Engine

## Authored by Ben Gaskill

Google Drive Output Data Link: https://drive.google.com/drive/u/0/folders/1poQVjxeLIgITe0vYxI51rtVrJH9nQrVP

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

In [None]:
pip install xee

In [None]:
pip install rasterio

In [None]:
import geemap
import ee
import xee
import rasterio
import os
import numpy as np
import matplotlib.pyplot as plt
import json
import geopandas as gpd

In [None]:
# To replicate this project, replace this with a valid
# Google Earth Engine cloud project
cloud_project = 'bengaskill12-remote-sensing'

# initialize, skipping authentication if the user previously authenticated
try:
    ee.Initialize(project=cloud_project)
except:
    ee.Authenticate()
    ee.Initialize(project=cloud_project)

In [None]:
# The bounding boxes can be loaded by any user using the shared assets.
# Load bounding boxes of sample sites that I defined and exported in GEE
mixed_urban = ee.FeatureCollection("projects/bengaskill12-remote-sensing/assets/mixedUrban")
rural = ee.FeatureCollection("projects/bengaskill12-remote-sensing/assets/rural")

Map = geemap.Map()
Map.set_center(25.0, -2.0, 4)

# I used these Sentinel 2 footprints to select study areas with full coverage.
sentinel2_tiles = ee.FeatureCollection("users/wiell/SepalResources/sentinel2SceneAreas")
Map.addLayer(sentinel2_tiles.style(color='yellow', fillColor='00000000'), {},
             'Sentinel‑2 MGRS Tiles')

# Selecting dates in 2024
start_date = '2024-01-01'
end_date = '2024-12-31'

# get geometries and combine
mixed_urban_polygons = mixed_urban.geometry().geometries()
rural_polygons = rural.geometry().geometries()
all_bounding_boxes = mixed_urban_polygons.cat(rural_polygons)

def mask_clouds(image):
  """
    Mask clouds using the Scene Classification Layer. Masks cloud shadows (3),
    medium probability cloud (8), high probability cloud (9), thin cirrus (10).
    Arguments:
            image (EE.Image) -- Input Sentinel 2 imagery with (SCL included).
    Returns:
            image (EE.Image) -- Returns the masked Sentinel images.
  """

  scl = image.select('SCL')
  mask = scl.neq(3).And(scl.neq(8)).And(scl.neq(9)).And(scl.neq(10))
  return image.updateMask(mask).copyProperties(image, ['system:time_start'])


def has_full_coverage(image, geometry):
  """
    Select only images that have valid data for the entire extent of the bbox.
    Prevent partial coverage images from being included.
    Arguments:
            image (EE.Image) -- Input Sentinel 2 imagery.
            geometry (EE.Geometry) -- Input bounding box geometry.
    Returns:
            image (EE.Image) -- Returns the masked Sentinel images.
  """
  # choose only one band to test
  mask = image.select('B4').mask()
  min_mask = mask.reduceRegion(
      reducer=ee.Reducer.min(),
      geometry=geometry,
      scale=10,
      maxPixels=1e9
  ).get('B4')

  # Check if the result is equal to 1 (full coverage = true)
  return ee.Number(min_mask).eq(1)

In [None]:
# Merge bounding boxes
bboxes = mixed_urban.merge(rural).geometry().getInfo()

# Write bboxes from EE object to a GeoJSON
with open('bboxes.geojson', 'w') as f:
    json.dump(bboxes, f)

# Convert to shapefile
gdf = gpd.read_file("bboxes.geojson")
gdf.to_file("bboxes.shp")

In [None]:
# Main Processing Block
"""
The export process could take a few hours to run, only uncomment the export
block if you need to download all 48 images (approximately 1 GB each)
"""

# list to hold sample sites
all_samples = []

# Loop through bounding boxes, select bands, filter dates, mask clouds
for i in range(all_bounding_boxes.size().getInfo()):
    geometry = ee.Geometry(all_bounding_boxes.get(i))
    bounding_box = geometry.bounds()

    collection = (
        ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED')
        .filterBounds(geometry)
        .select(['B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B9',
                 'B11', 'B12', 'SCL'])
        .filterDate(start_date, end_date)
        .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 5))
        .map(mask_clouds)
        .sort('system:time_start')
    )

    # check for full coverage in each image
    def set_full_coverage(img):
        return img.set('hasFullCoverage', has_full_coverage(img, geometry))

    # filter results that only have full coverage
    filtered = collection.map(set_full_coverage).filter(ee.Filter.eq('hasFullCoverage', 1))
    revisit_list = filtered.toList(8) # Worldstrat accepts a stack of 8 inputs

    # Loop through filtered list, clip, convert to int16 (solved an error),
    # add date, display on map
    for j in range(revisit_list.size().getInfo()):
        image = ee.Image(revisit_list.get(j)).clip(geometry)
        image = image.toInt16()
        date = ee.Date(image.get('system:time_start')).format('YYYY-MM-dd').getInfo()
        Map.addLayer(image, {'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 3000}, f'Site {i} - {date}')
        all_samples.append(image)

        # # https://geemap.org/notebooks/11_export_image/#download-an-eeimage
        # # Only uncomment if you need to export to your Google Drive. Replace
        # # the folder with a valid folder in your Drive.
        # geemap.ee_export_image_to_drive(
        #   image=image,
        #   folder='SuperResolution12RV2',
        #   fileNamePrefix=f'Site_{i}_Image_{j}',
        #   region=geometry,
        #   scale=10,
        #   crs='EPSG:4326',
        #   maxPixels=1e13,
        #   fileFormat='GeoTIFF')

print(f'Total images collected: {len(all_samples)}')

In [None]:
# Display GEE map output
Map.centerObject(mixed_urban.merge(rural), zoom=7)
Map

In [None]:
# Print metadata for each image, confirming the files exist in the Drive folder

# Replace this with the same valid folder from the export code
src_dir = "gdrive/MyDrive/SuperResolution12RV2"

# get files that end with .tif
files = [f for f in os.listdir(src_dir) if f.endswith('.tif')]

# Open each file and print info
for file in files:
    file_path = os.path.join(src_dir, file)
    print(f"Opening file: {file_path}")
    with rasterio.open(file_path) as src:
        print(f"File {file} opened successfully.")
        print(f"Metadata: {src.meta}")
        print(f"Bands: {src.count}")
        print(f"Shape: {src.shape}")

In [None]:
# Plot the bands of a single image to test
image_path = "gdrive/MyDrive/SuperResolution12RV2/Site_2_Image_2.tif"

with rasterio.open(image_path) as src:
    data = src.read()

# Plot each band
num_bands = data.shape[0]
fig, axes = plt.subplots(1, num_bands, figsize=(15, 5))

for i in range(num_bands):
    ax = axes[i]
    ax.imshow(data[i], cmap='gray')
    ax.set_title(f'Band {i+1}')
    ax.axis('off')

plt.tight_layout()
plt.show()

In [None]:
def read_rgb(path):
    with rasterio.open(path) as src:
        rgb = np.stack([src.read(4), src.read(3), src.read(2)], axis=-1).astype(np.float32)
    return rgb

image_folder = "gdrive/MyDrive/SuperResolution12RV2"

fig, axes = plt.subplots(1, 6, figsize=(48, 10))

for site in range(6):
    path = os.path.join(image_folder, f"Site_{site}_Image_0.tif")
    rgb = read_rgb(path)

    # normalize within 2%-98% percentiles
    for c in range(3):
        p2, p98 = np.percentile(rgb[..., c], (2, 98))
        rgb[..., c] = np.clip((rgb[..., c] - p2) / (p98 - p2 + 1e-5), 0, 1)

    axes[site].imshow(rgb)
    axes[site].set_title(f"Site {site}")
    axes[site].axis("off")

plt.suptitle("Selected sites in Zambia for SuperResolution", fontsize=64)
plt.tight_layout()
plt.show()
