# **1) Import the Modules**

Modules are code libraries that contain a set of ready-to-use functions.

* The `ee` module allows developers to interact with Google Earth Engine using the Python programming language.
* The `os` module provides functions to perform tasks such as file and directory operations,
process management, and environment variable manipulation.
* The `json` module allows developers to load, read and write JSON files.
* The `yaml` module allows developers to load, read and write YAML files.
* The `geemap` module allows interactive analysis and visualization of GEE datasets in a Jupyter environment.
* The `matplotlib.pyplot` module provides a collection of functions for creating and customizing plots, diagrams and visualizations.
* The `plotly` module enables the creation of interactive and visually appealing visualizations for data analysis and presentation.
* The `google.colab` module provides access to some of the unique features and functionality of Google Colab.

In [None]:
!pip install -U kaleido
!pip install geemap

In [None]:
import ee
import os
import json
import yaml
import geemap

import matplotlib.pyplot as plt
import plotly.graph_objects as go

from google.colab import drive

# **2) Authentication Procedure**

This section provides instructions for setting up the Google Earth Engine Python API on Colab and for setting up Google Drive on Colab. These steps should be performed each time you start/restart/rollback a Colab session.

## **2.1) GEE**

The `ee.Authenticate` function authenticates access to the Google Earth Engine servers, while the `ee.Initialize` function initializes it. After executing the following cell, the user is prompted to grant Google Earth Engine access to their Google account.

**Note:** The Earth Engine API is installed by default in Google Colaboratory.

In [None]:
ee.Authenticate()
ee.Initialize(project="...")

To authorize access needed by Earth Engine, open the following URL in a web browser and follow the instructions. If the web browser does not start automatically, please manually browse the URL below.

    https://code.earthengine.google.com/client-auth?scopes=https%3A//www.googleapis.com/auth/earthengine%20https%3A//www.googleapis.com/auth/devstorage.full_control&request_id=MxamWnhoWks1bAfhvghvMq2Jh2oUssCs9Geys-ow0to&tc=qrVMJs_4YPuU6M52X1BiraM6PxOyDzRAtwJ3EW_1uDk&cc=6SbOKJwemAycXkIqCDF90yQMAxd1Kn9x_xyXy4RbcAY

The authorization workflow will generate a code, which you should paste in the box below.
Enter verification code: 4/1AfJohXlYztYRH-chu83tBEukmVSWFxwnQcA_aA9PBDUvdRiixAtzylTvNwk

Successfully saved authorization token.


## **2.2) GD**

The `drive.mount` function allows access to specific folders of Google Drive. Granting access to Google Drive allows code running in the notebook to modify files in Google Drive.

**Note:** When using the `Mount Drive` button in the file browser, no authentication codes are required for notebooks edited only by the current user.

In [None]:
drive.mount("/content/gdrive")

Mounted at /content/gdrive


# **3) Functions**

In [None]:
def calculate_date_distance(image):
  """
  Description:
    Finds the time difference between a GEE raster's timestamp and a specified event date.

  Arguments:
    image (ee.Image): An GEE image with a timestamp property "system:time_start".

  Returns:
    Returns the GEE image with a "date_distance" property denoting the time difference
    in milliseconds from the image's timestamp to the event date.
  """
  dateDistance = image.getNumber("system:time_start").subtract(eventDate.millis())
  return image.set("date_distance", dateDistance)


def process_object(obj, propertyName):
  """
  Description:
    Processes a GEE dictionary and extracts information based on the provided property name.

  Arguments:
    obj (ee.Dictionary): A GEE dictionary.
    propertyName (str): The property name to extract.

  Returns:
    A GEE list containing two elements:
      1. A string representation of the property's value.
      2. The result of floor division between the "sum" property and the provided property's value.
  """
  obj = ee.Dictionary(obj)
  label = obj.getNumber(propertyName).format()
  sum_value = obj.getNumber("sum").divide(metricNumber).floor()
  return ee.List([label, sum_value])

# **4) Parameters**

In [None]:
# Metric units
metricUnit = "hectares"
metricNumber = 1e4

# Case of interest.
caseCode = "emsr692"
caseArea = "magnesia"

# Projection of interest
projectionCRS = "EPSG:4326"
projectionScale = 10

# `Dynamic World LULC v1`
dwStartDate = "2023-01-01"
dwEndDate = "2023-06-01"

# `Sentinel-1 GRD`
b1 = "VV"
b2 = "VH"

rasterIdentifiers = {
  "pre_event": "S1A_IW_GRDH_1SDV_20220103T162359_20220103T162424_041299_04E8BD_E9C5",
  "post_event": "S1A_IW_GRDH_1SDV_20230907T162412_20230907T162437_050224_060B99_D80F"
}

# `Classification`
floodClassLabel = 1
classified = "..."

# GD paths
configFile = "..."

destinationPath = "..."

# **5) Configuration**

In [None]:
# `GHS-POP R2023A - GHS population grid multitemporal`
ghslConfig = {
  "name": "JRC/GHSL/P2023A/GHS_POP"
}

populationVisualization = {
  "min": 0.0,
  "max": 20.0,
  "palette": ["060606", "337663", "337663", "ffffff"]
}

# `FAO Global Second-Level Administrative Units`
prefecturesConfig = {
  "name": "FAO/GAUL_SIMPLIFIED_500m/2015/level2"
}

# `Dynamic World LULC v1`
dwConfig = {
  "name": "GOOGLE/DYNAMICWORLD/V1",
  "resolution": 10
}

dwValueLabels = {
  "0": "water",
  "1": "trees",
  "2": "grass",
  "3": "flooded_vegetation",
  "4": "crops",
  "5": "shrub_and_scrub",
  "6": "built",
  "7": "bare",
  "8": "snow_and_ice",
}

dwLabelsPalette = {
  "water": "419BDF",
  "trees": "397D49",
  "grass": "88B053",
  "flooded vegetation": "7A87C6",
  "crops": "E49635",
  "shrub and scrub": "DFC35A",
  "built": "C4281B",
  "bare": "A59B8F",
  "snow and ice": "B39FE1"
}

dwVisualization = {
  "min": 0,
  "max": 8,
  "bands": ["label"],
  "palette": list(dwLabelsPalette.values())
}

# `Sentinel-1 GRD`
s1Config = {
  "name": "COPERNICUS/S1_GRD",
  "mode": "IW",
  "resolution": 10,
  "ascendingPass": "ASCENDING",
  "descendingPass": "DESCENDING"
}

s1Visualization = {
  "min": [-25, -25, 0],
  "max": [0, 0, 2],
  "bands": ["VV", "VH", "VVVHQ"]
}

bandCombinations = {
  "sum": {
    "expression": "b1 + b2",
    "name": "b1b2S"
  },
  "difference": {
    "expression": "b1 - b2",
    "name": "b1b2D"
  },
  "product": {
    "expression": "b1 * b2",
    "name": "b1b2P"
  },
  "quotient": {
    "expression": "b1 / b2",
    "name": "b1b2Q"
  },
  "ndpi": {
    "expression": "(b1 - b2) / (b1 + b2)",
    "name": "NDPI"
  }
}

# `Classification`
classPalette = {
  "non water": "deb887",
  "flood": "C60404",
  "water": "45b6fe"
}

classVisualization = {
  "min": 0,
  "max": 2,
  "palette": list(classPalette.values())
}

In [None]:
# Parse the approprieate configuration file.
extension = os.path.splitext(configFile)[-1]

try:
  with open(configFile, "r") as stream:
    # JSON configuration file
    if extension == ".json":
      caseConfigurations = json.load(stream)

    # YAML configuration file
    if extension == ".yaml":
      caseConfigurations = yaml.safe_load(stream)

except FileNotFoundError as e:
  print(f"Error: JSON file not found: {e}")

# Retrieve configurations.
caseConfig = caseConfigurations[caseCode][caseArea]

# Retrieve GEE assets.
areaOfInterest = ee.FeatureCollection(caseConfig["area_of_interest"])
ring = ee.Geometry.LinearRing(areaOfInterest.geometry().coordinates().flatten())

# **6) Data Processing**

Define the projection of interest.

In [None]:
projection = ee.Projection(projectionCRS).atScale(projectionScale)

Engineer additional raster bands.

In [None]:
quotientExpression = bandCombinations["quotient"]["expression"]
quotientName = bandCombinations["quotient"]["name"]

Load, filter and process raster collections.

In [None]:
# `Sentinel-1 GRD`.
classified = ee.Image(classified)

preEventRaster = ee.Image("/".join([s1Config["name"], rasterIdentifiers["pre_event"]]))   \
  .clipToCollection(areaOfInterest)
postEventRaster = ee.Image("/".join([s1Config["name"], rasterIdentifiers["pre_event"]]))  \
  .clipToCollection(areaOfInterest)

# `FAO Global Second-Level Administrative Units`
prefectures = ee.FeatureCollection(prefecturesConfig["name"]).filterBounds(areaOfInterest)

# `Dynamic World Land Cover V1`
dwMosaic = ee.ImageCollection(dwConfig["name"])   \
  .filterDate(dwStartDate, dwEndDate)             \
  .filterBounds(areaOfInterest)                   \
  .select("label")                                \
  .mode()                                         \
  .clipToCollection(areaOfInterest)               \
  .reproject(projection)

# `GHS-POP R2023A - GHS population grid multitemporal`
eventDate = ee.Date(caseConfig["event_date"])

ghslPopulation = ee.ImageCollection(ghslConfig["name"])   \
  .map(calculate_date_distance)                           \
  .sort("date_distance")                                  \
  .first()                                                \
  .clipToCollection(areaOfInterest)

Engineer additional raster bands.

In [None]:
# `PRE_VVVHQ`
preEventQuotient = ee.Image().expression(**{
    "expression": quotientExpression,
    "opt_map": {
      "b1": preEventRaster.select(b1),
      "b2": preEventRaster.select(b2)
    }
  })  \
  .rename(quotientName.replace("b1", b1).replace("b2", b2))

# `POST_VVVHQ`
postEventQuotient = ee.Image().expression(**{
    "expression": quotientExpression,
    "opt_map": {
      "b1": postEventRaster.select(b1),
      "b2": postEventRaster.select(b2)
    }
  })  \
  .rename(quotientName.replace("b1", b1).replace("b2", b2))

# Incorporate extra bands into both pre- & post- event rasters.
preEventRaster = preEventRaster.addBands(preEventQuotient)
postEventRaster = postEventRaster.addBands(postEventQuotient)

Perform area calculations.

In [None]:
floodMask = classified.eq(floodClassLabel)
eventExtent = classified.updateMask(floodMask)

# Calculate total area.
emsCaseArea = areaOfInterest.geometry().area()
emsCaseArea = ee.Number(emsCaseArea).divide(metricNumber).floor()

# Calculate area covered by each class.
areasImage = ee.Image.pixelArea().addBands(classified)

classifiedAreas = areasImage.reduceRegion(**{
  "reducer": ee.Reducer.sum().group(**{
    "groupField": 1,
    "groupName": "classification",
  }),
  "geometry": areaOfInterest,
  "scale": 10,
  "maxPixels": 1e13
})

classifiedAreas = ee.List(classifiedAreas.get("groups"))  \
  .map(lambda obj: process_object(obj, "classification"))

classifiedResult = ee.Dictionary(classifiedAreas.flatten())

Perform damage assessment analysis.

In [None]:
# Exposed human settlements.

# Update the layer's mask.
ghslProjection = ghslPopulation.projection()
ghslPopulation = ghslPopulation.updateMask(eventExtent.reproject(ghslProjection))

# Get the number of exposed settlements.
ghslPopulationCount = ghslPopulation.reduceRegion(**{
  "reducer": ee.Reducer.sum(),
  "geometry": areaOfInterest,
  "scale": ghslProjection.nominalScale(),
  "maxPixels":1e13
})                              \
.getNumber("population_count")  \
.floor()

# `Dynamic World Land Cover V1`

floodedDW = dwMosaic.updateMask(eventExtent)
dwPixelArea = ee.Image.pixelArea().addBands(floodedDW)

dwLabelAreas = dwPixelArea.reduceRegion(**{
  "reducer": ee.Reducer.sum().group(**{
    "groupField": 1,
    "groupName": "label",
  }),
  "geometry": areaOfInterest.geometry(),
  "crs": projectionCRS,
  "scale": projectionScale,
  "maxPixels": 1e13
})

dwLabelAreas = ee.List(dwLabelAreas.get("groups"))  \
  .map(lambda obj: process_object(obj, "label"))

dwResult = ee.Dictionary(dwLabelAreas.flatten())

# **6) Console**

In [None]:
print(f"Total extent: `{emsCaseArea.getInfo()}`")
print(f"Extent by class: `{classifiedResult.getInfo()}`")
print(f"* Area extents are measured in `{metricUnit}`.\n")

print(f"Total affected population: `{ghslPopulationCount.getInfo()}`")

Total extent: `826969`
Extent by class: `{'0': 373858, '1': 25088, '2': 13362}`
* Area extents are measured in `hectares`.

Total affected population: `1062`


# **6) Data Visualization**

In [None]:
destinationPath = os.path.join(destinationPath, caseCode, caseArea, "damages")

Percentage of exposed areas

In [None]:
dwLabelCounts = dwResult.getInfo()

dwDescriptionCounts = {dwValueLabels.get(key, 0).replace("_", " "): value for key, value in dwLabelCounts.items()}
dwDescriptionCounts = {key: value for key, value in dwDescriptionCounts.items() if value != 0}

labels = list(dwDescriptionCounts.keys())
values = list(dwDescriptionCounts.values())
colors = [dwLabelsPalette[key] for key in dwDescriptionCounts]

In [None]:
# Create a Pie chart.
fig = go.Figure(data=[go.Pie(
  labels=labels,
  values=values,
  hole=0.3,
  textinfo="percent",
  insidetextorientation="radial",
  marker=dict(colors=colors),
  textposition="inside",
  textfont=dict(color="#FFFFFF")
)])

# Customize the layout.
fig.update_layout(
  title="Land Use/Land Cover Distribution",
  title_x=0.5,
  font=dict(size=16),
  title_yanchor="top",
  title_font_size=20,
  legend_title_text="Classes",
  uniformtext_minsize=12,
  width=700,
  height=700,
  uniformtext_mode="hide"
)

# Export the chart as a static image.
destination = os.path.join(destinationPath, "lulc_distribution.png")
fig.write_image(destination, scale=6)
print(f"Stored donut chart to: `{destination}`.")

Stored donut chart to: `/content/gdrive/MyDrive/t-h-e-s-i-s/results/emsr692/magnesia/damages/lulc_distribution.png`.


Affected settlements

In [None]:
affectedPrefectures = ghslPopulation.reduceRegions(**{
  "collection": prefectures,
  "reducer": ee.Reducer.sum(),
  "scale": ghslProjection.nominalScale(),
})

affectedPrefectureNames = affectedPrefectures.aggregate_array("ADM2_NAME").getInfo()
affectedPrefectureStats = affectedPrefectures.aggregate_array("sum").getInfo()

In [None]:
# Set up the figure and axis.
fig, ax = plt.subplots(figsize=(10, 7))

# Set the color inside the plot behind the bars.
ax.set_facecolor("lightgray")

# Create the bar plot.
plt.bar(affectedPrefectureNames, affectedPrefectureStats)

# Set plot title.
plt.title("Affected Prefectures", fontsize=16)
# Set x-axis label.
plt.xlabel("Prefectures", fontsize=12)
# Set y-axis label.
plt.ylabel("Stats", fontsize=12)

# Rotate x-axis labels for better readability.
plt.xticks(rotation=45, ha="center", fontsize=12)

# Add grid lines
plt.grid(color="white", linestyle="--", linewidth=0.5)

# Add numeric values on top of the bars
for i, v in enumerate(affectedPrefectureStats):
  plt.text(i, v, f"{v:.3f}", ha="center", va="bottom", fontsize=10)

# Export the plot as a raster.
destination = os.path.join(destinationPath, "affected_prefectures.png")
plt.savefig(destination, dpi=500, bbox_inches="tight")
plt.close()

# **8) Map Visualization**

In [None]:
Map = geemap.Map()
Map.centerObject(ring)

# `Dynamic-World`
Map.add_legend(title="DW Labels", position="bottomleft", legend_dict = dwLabelsPalette)

# `Classification`
Map.add_legend(title="RF Labels", position="bottomright", legend_dict = classPalette)

Map.addLayer(dwMosaic, dwVisualization, "rasters: DW");
Map.addLayer(preEventRaster, s1Visualization, "rasters: S1-GRD (pre)");
Map.addLayer(postEventRaster, s1Visualization, "rasters: S1-GRD (post)");
Map.addLayer(ghslPopulation, populationVisualization, "rasters: GSHL");

Map.addLayer(classified, classVisualization, "rasters: classification");
Map.addLayer(dwMosaic.updateMask(floodMask), dwVisualization, "rasters: DW (masked)");
Map.addLayer(eventExtent, {"color": "black"}, "rasters: (flood-extent)");

Map.addLayer(ring, {"color": "white"}, "vectors: area of interest");

Map

Map(center=[20, 0], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=HBox(children=(Togg…

-End of Notebook-