In [None]:
#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the "License")

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# 🛑 Links (DELETE)

- [Brijesh's notebook](https://colab.research.google.com/drive/1bA6z93JB-g1U2Mz6-YAdqpmoqsZBITdv?resourcekey=0-66KLAWuQZTLgNewtVIoLvA&usp=sharing)

- [COGs](https://colab.sandbox.google.com/drive/1-5Nzn1_2Kea7COm-myrlmprtvY8H40qO?resourcekey=0-tnzED-IAVthJsl9t0bixtQ&usp=sharing)

# 🌦️ Weather forecasting

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/README.ipynb)

This notebook leverages geospatial satellite and precipitation data from [Google Earth Engine](https://earthengine.google.com/). Using satellite imagery, you'll build and train a model for rain "nowcasting" i.e. predicting the amount of rainfall for a given geospatial region and time in the immediate future.

* ⏲️ **Time estimate**: 1 hour
* 💰 **Cost estimate**: less than $1.00 USD

💚 This is one of many **machine learning how-to samples** inspired from **real climate solutions** aired on the [People and Planet AI 🎥 series](https://www.youtube.com/playlist?list=PLIivdWyY5sqI-llB35Dcb187ZG155Rs_7).

## 📒 Using this interactive notebook

Click the **run** icons ▶️ of each section within this notebook.

![Run cell](images/run-cell.png)

> 💡 Alternatively, you can run the currently selected cell with `Ctrl + Enter` (or `⌘ + Enter` in a Mac).

This **notebook code lets you train and deploy an ML model** from end-to-end. When you run a code cell, the code runs in the notebook's runtime, so you're not making any changes to your personal computer.

> ⚠️ **To avoid any errors**, wait for each section to finish in their order before clicking the next “run” icon.

This sample must be connected to a **Google Cloud project**, but nothing else is needed other than your Google Cloud project.

You can use an _existing project_. Alternatively, you can create a new Cloud project [with cloud credits for free.](https://cloud.google.com/free/docs/gcp-free-tier)

## 🛸 Steps summary

This notebook is friendly for _beginner_, _intermediate_, and _advanced_ users of geospatial, data analytics and machine learning.

Here's a quick summary of what you’ll go through:

1. **📚 Understand the data**:
  Go through what we want to achieve and explore the data we want to use as _inputs and outputs_ for our model.

1. **🗄 Create the datasets** _(~20 minutes, costs [a few cents](https://cloud.google.com/dataflow/pricing))_:
  Learn to use
  [Apache Beam](https://beam.apache.org/)
  to fetch data from
  [Earth Engine](https://earthengine.google.com/)
  in parallel, and create a dataset for our model in
  [Dataflow](https://cloud.google.com/dataflow).

1. **🧠 Train the model** _(~10 minutes, costs [a few cents](https://cloud.google.com/vertex-ai/pricing#custom-trained_models))_:
  Build a simple _Fully Convolutional Network_ in
  [PyTorch](https://pytorch.org/)
  and train it in
  [Vertex AI](https://cloud.google.com/vertex-ai/docs/training/custom-training)
  with the dataset we created.

1. **🔮 Model predictions**:
  Get predictions of data the model has never seen before.

  * 💻 **Local predictions** _(~5 minutes)_: Get predictions directly in this notebook without hosting the model.

  * ☁️ **Cloud Run predictions** _(~10 minutes, [cost covered by free tier](https://cloud.google.com/run/pricing))_: Host the model in [Cloud Run](https://cloud.google.com/run), an easy to use and scalable serverless web service.

1. (Optional) **🛠 Delete the project** to avoid ongoing costs.

# 🎬 Before you begin

We first need to install all the requirements for the sample residing in GitHub.

The sample contains a
[`requirements.txt`](requirements.txt)
file with all the dependencies we need to install, so let's use that.

In [1]:
repo_url = "https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/main/people-and-planet-ai/land-cover-classification"
!wget --quiet {repo_url}/requirements.txt

> 💡 For more information about the `requirements.txt` files, see the [`pip` user guide](https://pip.pypa.io/en/stable/user_guide/).

In [None]:
# Update pip to use the latest version.
!pip install --quiet --upgrade pip

In [None]:
# Install the dependencies.
!pip install --quiet -r requirements.txt

# Restart the runtime by ending the process.
exit()

> **🛑 Restart the runtime 🛑**

Colab already comes with many dependencies pre-loaded.
In order to ensure everything runs as expected, we **_must_ restart the runtime**. This allows Colab to load the latest versions of the libraries.

!["Runtime" > "Restart runtime"](images/restart-runtime.png)

# ☁️ My Google Cloud resources

First, choose the Google Cloud _location_ where you want to run this sample.
A good place to start is by choosing your [Google Cloud location](https://cloud.google.com/compute/docs/regions-zones).

> ⚠️ Make sure you choose a location
> available for all products: [Cloud Storage](https://cloud.google.com/storage/docs/locations),
> [Vertex AI](https://cloud.google.com/vertex-ai/docs/general/locations),
> [Dataflow](https://cloud.google.com/dataflow/docs/resources/locations), and
> [Cloud Run](https://cloud.google.com/run/docs/locations).

> 💡 Prefer locations that are geographically closer to you with
> [low carbon emissions](https://cloud.google.com/sustainability/region-carbon), highlighted with the
> ![Leaf](https://cloud.google.com/sustainability/region-carbon/gleaf.svg) icon.

Make sure you have followed these steps to configure your Google Cloud project:

1. Enable the APIs: _Dataflow, Earth Engine, Vertex AI, and Cloud Run_

  <button>

  [Click here to enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow.googleapis.com,earthengine.googleapis.com,aiplatform.googleapis.com,run.googleapis.com)
  </button>

1. Create a Cloud Storage bucket in your desired _location_.

  <button>

  [Click here to create a new Cloud Storage bucket](https://console.cloud.google.com/storage/create-bucket)
  </button>

1. Register your
  [Compute Engine default service account](https://console.cloud.google.com/iam-admin/iam)
  on Earth Engine.

  <button>

  [Click here to register your service account on Earth Engine](https://signup.earthengine.google.com/#!/service_accounts)
  </button>

Once you have everything ready, you can go ahead and fill in your Google Cloud resources in the following code cell.
Make sure you run it!

In [1]:
#@title 🛑 Set up resources (DELETE)
from __future__ import annotations

import os
from google.colab import auth

project = "dcavazos-lyra"
bucket = "dcavazos-lyra"
location = "us-central1"

auth.authenticate_user()
os.environ['GOOGLE_CLOUD_PROJECT'] = project
!gcloud config set project {project}

%cd /content
!rm -rf python-docs-samples

# TODO: REPLACE WITH URL FROM UPSTREAM MAIN
# !git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
!git clone -b ppai-weather-forecasting https://github.com/davidcavazos/python-docs-samples.git
%cd python-docs-samples/people-and-planet-ai/weather-forecasting

import importlib
import create_dataset
import trainer
import trainer.model
import serving
import serving.data
import visualize
importlib.reload(create_dataset)
importlib.reload(trainer)
importlib.reload(trainer.model)
importlib.reload(serving)
importlib.reload(serving.data)
importlib.reload(visualize)
None

Updated property [core/project].
/content
Cloning into 'python-docs-samples'...
remote: Enumerating objects: 85284, done.[K
remote: Counting objects: 100% (22/22), done.[K
remote: Compressing objects: 100% (15/15), done.[K
remote: Total 85284 (delta 7), reused 14 (delta 5), pack-reused 85262[K
Receiving objects: 100% (85284/85284), 140.95 MiB | 25.16 MiB/s, done.
Resolving deltas: 100% (50351/50351), done.
Checking out files: 100% (3603/3603), done.
/content/python-docs-samples/people-and-planet-ai/weather-forecasting


In [None]:
from __future__ import annotations

import os
from google.colab import auth

# Please fill in these values.
project = "" #@param {type:"string"}
bucket = "" #@param {type:"string"}
location = "us-central1" #@param {type:"string"}

# Quick input validations.
assert project, "⚠️ Please provide a Google Cloud project ID"
assert bucket, "⚠️ Please provide a Cloud Storage bucket name"
assert not bucket.startswith('gs://'), f"⚠️ Please remove the gs:// prefix from the bucket name: {bucket}"
assert location, "⚠️ Please provide a Google Cloud location"

# Authenticate to Colab.
auth.authenticate_user()

# Set GOOGLE_CLOUD_PROJECT for google.auth.default().
os.environ['GOOGLE_CLOUD_PROJECT'] = project

# Set the gcloud project for other gcloud commands.
!gcloud config set project {project}

In [None]:
# Now let's get the code from GitHub and navigate to the sample.
!git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
%cd python-docs-samples/people-and-planet-ai/land-cover-classification

Next, we have to authenticate Earth Engine and initialize it.
Since we've already authenticated to this [Colab](https://www.youtube.com/watch?v=rNgswRZ2C1Y) and saved them as the [Google default credentials](https://google-auth.readthedocs.io/en/master/reference/google.auth.html#google.auth.default),
we can reuse those credentials for Earth Engine.

> 💡 Since we're making **large amounts of automated requests to Earth Engine**, we want to use the
[high-volume endpoint](https://developers.google.com/earth-engine/cloud/highvolume).

In [2]:
import ee
import google.auth

credentials, _ = google.auth.default()
ee.Initialize(
    credentials.with_quota_project(None),
    project=project,
    opt_url="https://earthengine-highvolume.googleapis.com",
)

# 📚 Understand the data

The goal of our model is using satellite images to do _weather forecasting_.
Specifically, we want to predict the amount of rainfall, measured in millimeters per hour, for the next two to six hours in the future.
This kind of short term forecasting is called [weather _nowcasting_](https://en.wikipedia.org/wiki/Nowcasting_(meteorology)).

When working with satellite data, each image has the shape `(width, height, bands)`.
**Bands** contain _numeric values_ for each pixel in the image, like the measurements from specific satellite instruments for different ranges of the electromagnetic spectrum, or the probabilities of different classifications.
If you're familiar with image classification problems, you can think of the bands as similar to an image's RGB channels.

## ☔️ Precipitation

We use [NASA's Global Precipitation Measurement (GPM)](https://developers.google.com/earth-engine/datasets/catalog/NASA_GPM_L3_IMERG_V06) to get the amount of _precipitation_ of rain and snow, measured as millimeters per hour.
We're interested in the `precipitationCal` band, which gives us the _calibrated_ precipitation amount.

This is what we want to predict, so we'll use them for our _labels_.
But it's also useful for the model to look at the precipitation from the _past_, so we'll also use it as _inputs_.

In the [`serving/data.py`](serving/data.py) file, we defined a function called `get_gpm_sequence` which returns us an `ee.Image` with the precipitation values for the time sequence we give it.
Each time step is stored in a different band with the index as a prefix.
For example, the band corresponding to the first time step in the sequence would be `0_precipitationCal`, and the second time step would be `1_precipitationCal`.

In [None]:
from datetime import datetime
import folium
import ee
from serving.data import get_gpm_sequence

def gpm_layer(image: ee.Image, label: str, i: int) -> folium.TileLayer:
  vis_params = {
      "bands": [f"{i}_precipitationCal"],
      "min": 0.0,
      "max": 20.0,
      "palette": [
          '000096', '0064ff', '00b4ff', '33db80', '9beb4a',
          'ffeb00', 'ffb300', 'ff6400', 'eb1e00', 'af0000',
      ],
  }
  # Mask (hide) pixels with no precipitation to see the map below.
  image = image.mask(image.gt(0.1))
  return folium.TileLayer(
      name=f"[{label}] Precipitation",
      tiles=image.getMapId(vis_params)["tile_fetcher"].url_format,
      attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
      overlay = True,
  )

# Get the Earth Engine images.
dates = [datetime(2019, 9, 2, 18)]
image = get_gpm_sequence(dates)

# Show map.
map = folium.Map([25, -90], zoom_start=5)
for i, date in enumerate(dates):
  gpm_layer(image, str(date), i).add_to(map)
folium.LayerControl().add_to(map)
map

> 💡 This is [Hurricane Dorian](https://en.wikipedia.org/wiki/Hurricane_Dorian), the strongest Category 5 hurricane on record in the Bahamas.

## 🌨 Cloud and moisture

To predict precipitation, it's also useful to take a look at the _cloud_ and _moisture_.
We use data from [GOES-16 Cloud and Moisture Imagery](https://developers.google.com/earth-engine/datasets/catalog/NOAA_GOES_16_MCMIPF), which was the first satellite from the [Geostationary Operational Environmental Satellites (GOES)](https://en.wikipedia.org/wiki/Geostationary_Operational_Environmental_Satellite) mission, operated by [NASA](https://en.wikipedia.org/wiki/NASA) and [NOAA](https://en.wikipedia.org/wiki/National_Oceanic_and_Atmospheric_Administration).
It includes measurements from the _visible_, _near-infrared_, and _infrared_ spectrum.
It is a [geostationary](https://en.wikipedia.org/wiki/Geostationary_orbit) satellite, so its orbit is synchronized with the Earth's rotation, and it provides a view centered in the Americas.

In the [`serving/data.py`](serving/data.py) file, we defined a function called `get_goes16_sequence` which returns us an `ee.Image` with the cloud and moisture data for the time sequence we give it.

In [None]:
from datetime import datetime
import folium
import ee
from serving.data import get_goes16_sequence

def goes16_layer(image: ee.Image, label: str, i: int) -> folium.TileLayer:
  vis_params = {
      "bands": [f"{i}_CMI_C02", f"{i}_CMI_C03", f"{i}_CMI_C01"],
      "min": 0.0,
      "max": 3000.0,
  }
  return folium.TileLayer(
      name=f"[{label}] Cloud and moisture",
      tiles=image.getMapId(vis_params)["tile_fetcher"].url_format,
      attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
      overlay = True,
  )

# Get the Earth Engine image.
dates = [datetime(2019, 9, 2, 18)]
image = get_goes16_sequence(dates)

# Show map.
map = folium.Map([25, -90], zoom_start=5)
for i, date in enumerate(dates):
  goes16_layer(image, str(date), i).add_to(map)
folium.LayerControl().add_to(map)
map

## 🏔 Elevation

Elevation could also give the model useful information.
We use the [MERIT Terrain DEM](https://developers.google.com/earth-engine/datasets/catalog/MERIT_DEM_v1_0_3) dataset to get the elvation.

In the [`serving/data.py`](serving/data.py) file, we defined a function called `get_elevation` which returns us an `ee.Image` with the elevation measured in meters.

In [None]:
import folium
from serving.data import get_elevation

def elevation_layer() -> folium.TileLayer:
  image = get_elevation()
  vis_params = {
      "bands": ["elevation"],
      "min": 0.0,
      "max": 3000.0,
      "palette": ['000000', '478FCD', '86C58E', 'AFC35E', '8F7131', 'B78D4F', 'E2B8A6', 'FFFFFF']
  }
  return folium.TileLayer(
      name="Elevation",
      tiles=image.getMapId(vis_params)["tile_fetcher"].url_format,
      attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
      overlay = True,
  )

# Show map.
map = folium.Map([25, -90], zoom_start=5)
elevation_layer().add_to(map)
folium.LayerControl().add_to(map)
map

## 🛰 Inputs

In this example, we also consider multiple images across time, since weather forecasting is more accurate when we look at how the cloud cover changes over a period of time.
Particularly, we consider 3 data points - 4 hours prior, 2 hours prior, and current.

> 💡 To give the model a better picture, we chose to feed it with _at least three_ data points from the past.
> With only a single point, the model wouldn't know if the rain is increasing or decreasing.
> Two points would give it an idea of the trend.
> Three or more points would give it an idea of how fast it's changing.
> The more points, the more it can see.

In the [`serving/data.py`](serving/data.py) file, we defined a function called `get_inputs_image` which returns us an `ee.Image` with bands for all the time steps for cloud and moisture, and for precipitation, alongside with the elevation.

In [None]:
from datetime import datetime, timedelta
import folium
from serving.data import get_inputs_image, INPUT_HOUR_DELTAS

# Get the Earth Engine image.
date = datetime(2019, 9, 2, 18)
image = get_inputs_image(date)

# Show map.
map = folium.Map([25, -90], zoom_start=5)
elevation_layer().add_to(map)
for i, h in enumerate(INPUT_HOUR_DELTAS):
  label = str(date + timedelta(hours=h))
  goes16_layer(image, label, i).add_to(map)
  gpm_layer(image, label, i).add_to(map)
folium.LayerControl().add_to(map)
map

> 💡 You can hide and show layers from the top-right corner widget to see all the inputs for the model.

## ✅ Labels

We chose to predict precipitation for 2 and 6 hours in the future, but it could be anything as long as we have the right _labels_.

In the [`serving/data.py`](serving/data.py) file, we defined a function called `get_labels_image` which returns us an `ee.Image` with bands for each time step of precipitation.

In [None]:
from datetime import datetime, timedelta
import folium
from serving.data import get_labels_image, OUTPUT_HOUR_DELTAS

# Get the Earth Engine image.
date = datetime(2019, 9, 3, 18)
image = get_labels_image(date)

# Show map.
map = folium.Map([25, -90], zoom_start=5)
for i, h in enumerate(OUTPUT_HOUR_DELTAS):
  label = str(date + timedelta(hours=h))
  gpm_layer(image, label, i).add_to(map)
folium.LayerControl().add_to(map)
map

# 🗄 Create the dataset

Now that we know the _inputs_ and _outputs_ for the model, we can create a _dataset_ to train the model.
A dataset consists of _training examples_, which are `(inputs, labels)` pairs, so for each input data, we have to give it the correct output values.

We want a _balanced_ dataset consisting on a representative, diverse, and unbiased selection of data points.
This way the model can learn from many different examples covering different seasons, times of day, regions, ecosystems, etc.

Let's take a closer look at how we select our training examples to create the dataset.

## 📌 Sample points

First, we want to get balanced points for a given time.
We use [`ee.Image.stratifiedSample`](https://developers.google.com/earth-engine/apidocs/ee-image-stratifiedsample) to select around the same number of points for each amount of precipitation.

Since the precipitation is a continuous value, we first need to convert it to a classification.
By looking at different images, we noticed that most values fall within 0 and 30.
So we simply clamped the values into that range, divided by the maximum value, multiplied by the number of bins, and converted them into integers.
This creates discrete bins for each value in the range, so we can do stratified sample on it.

In [`create_dataset.py`](create_dataset.py) we defined a function called `sample_points` that gives us a balanced selction of (longitude, latitude) coordinates for a given date.

In [51]:
from datetime import datetime
from create_dataset import sample_points

num_bins = 10   # number of bins for the stratified sampling
num_points = 1  # number of points per bin

date = datetime(2019, 9, 2, 18)
for date, point in sample_points(date, num_bins, num_points):
  print(f"{date} -- {point}")

2019-09-02 18:00:00 -- [-17.8315583897725, 26.90454275937967]
2019-09-02 18:00:00 -- [-51.06922390219479, -26.54521664573186]
2019-09-02 18:00:00 -- [-39.21146215181711, 10.375541531580472]
2019-09-02 18:00:00 -- [-29.95881472538604, 13.250150440762942]
2019-09-02 18:00:00 -- [-91.85273780122107, 16.484085463593217]
2019-09-02 18:00:00 -- [-43.523375515590814, -26.275722060496005]
2019-09-02 18:00:00 -- [-66.34058373222666, 1.122894105149402]
2019-09-02 18:00:00 -- [-76.5813779711892, 26.90454275937967]
2019-09-02 18:00:00 -- [-95.62566199452306, -30.587635424269706]
2019-09-02 18:00:00 -- [-36.067358657398785, -31.12662459474142]


> 💡 We only bucketize the precipitation to select a balanced dataset, but we use the original continuous value for the labels.

## 📑 Get training examples

The next step is to get our training examples data.
Sometimes there are transient errors like sending too many requests, so we used [`Retry`](https://googleapis.dev/python/google-api-core/latest/retry.html) to handle those cases.

We predefined that all our training examples would be 5 pixels width by 5 pixels height, but we could choose any size as long as the model accepts it.
We also want all the training examples to be the same size so we can batch them.

> 💡 We chose _5x5 patches_ because our Fully Convolutional Model uses a _3x3 kernel_.
> We want a _balanced_ representation of precipitation, and we did the stratified sampling on the _center_ pixel only.
> By choosing 5x5 patches with a 3x3 kernel, we make sure the center pixel we chose appears in all 9 positions for the kernel.

In [`create_dataset.py`](create_dataset.py) we defined `get_training_example`, which fetches an `(inputs, labels)` pair for the given date and (longitude, latitude) coordinate.
Let's see how a 64x64 patch looks like, since a 5x5 patch will only look like a bunch of random pixels to us.

In [45]:
from datetime import datetime
from create_dataset import get_training_example

date = datetime(2019, 9, 2, 18)
point = [-77.93, 25.23]  # [longitude, latitude]
(inputs, labels) = get_training_example(date, point, patch_size=64)

print(f"inputs : {inputs.dtype} {inputs.shape}")
print(f"labels : {labels.dtype} {labels.shape}")

inputs : float32 (64, 64, 52)
labels : float32 (64, 64, 2)


In [46]:
from visualize import show_inputs

show_inputs(inputs)

In [47]:
from visualize import show_outputs

show_outputs(labels)

## 📚 Write NumPy files

Finally, we need to write the training examples into files.
We chose [compressed NumPy files](https://numpy.org/doc/stable/reference/generated/numpy.savez_compressed.html) for simplicity.
We used Apache Beam [`FileSystems`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html) to be able to write into any file system that Beam supports, including Cloud Storage.

In [48]:
from create_dataset import write_npz

write_npz(inputs, labels, 'data-small/')

!ls -lh data-small

total 412K
-rw-r--r-- 1 root root 412K Dec  7 17:43 d0a947bf-bbb4-472a-9fa4-fdc36ad169a0.npz


## 🗃 Create the dataset

Finally, we create an
[Apache Beam](https://beam.apache.org/) pipeline, which allows us to create parallel processing pipelines.

Let's see how to create a small dataset!

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

data_path = "data/"
dates = [datetime(2019, 9, 2, 18)]
beam_options = PipelineOptions([], direct_num_workers=20)
with beam.Pipeline(options=beam_options) as pipeline:
    (
        pipeline
        | "📆 Create dates" >> beam.Create(dates)
        | "📌 Sample points" >> beam.FlatMap(sample_points)
        | "🃏 Reshuffle" >> beam.Reshuffle()
        | "📑 Get example" >> beam.MapTuple(get_training_example)
        | "📚 Write NPZ files" >> beam.MapTuple(write_npz, data_path)
    )

In [50]:
# Count the number of files we created.
!ls {data_path} | wc -l

ls: cannot access 'data/': No such file or directory
0


# ☁️ Create the dataset in Dataflow

Local testing works great for creating small datasets and making sure everything works, but to run on a large dataset at scale it's best to use a distributed runner like
[Dataflow](https://cloud.google.com/dataflow).

We can run [`create_dataset.py`](create_dataset.py) as a script and run it in [Dataflow](https://cloud.google.com/dataflow).
You can control the number of dates to sample with `--num-dates` _(default=50)_, the number of bins to use for the stratified sampling with `--num-bins` _(default=10)_, and the number of points per bin to sample with `--num-points` _(default=1)_.

In [None]:
!python create_dataset.py \
  --data-path="gs://{bucket}/weather/data" \
  --runner="DataflowRunner" \
  --project="{project}" \
  --region="{location}" \
  --temp_location="gs://{bucket}/weather/temp"

> 💡 Look at your Dataflow jobs: https://console.cloud.google.com/dataflow/jobs

# 🧠 Train the model

Now that we have our dataset, we're ready to train the model.

## 📖 Read the datasets

Reading a PyTorch dataset requires a subclass of `torch.utils.data.Dataset`.
We need to define [`__len__`](https://docs.python.org/3/reference/datamodel.html#object.__len__) and [`__getitem__`](https://docs.python.org/3/reference/datamodel.html#object.__getitem__) to allow random access to the training examples.

Another thing to note is that our data is in a channels-last format, like `(width, height, channels)`.
But PyTorch expects channels-first format in the convolutional layers, like `(channels, width, height)`, so our dataset handles doing the conversion.

In [`trainer/model.py`](trainer/model.py) we defined the `WeatherDataset` class to read our data files.

In [60]:
!mkdir data
!gsutil -m cp gs://{bucket}/weather/data-10k/* data/

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Copying gs://dcavazos-lyra/weather/data-10k/8039e25e-15ae-4953-a359-dc6817651221.npz...
Copying gs://dcavazos-lyra/weather/data-10k/8042d325-d71a-45cf-81b1-2c6d87e4a477.npz...
Copying gs://dcavazos-lyra/weather/data-10k/8045db4d-7df3-44ec-a586-acb68761afb5.npz...
Copying gs://dcavazos-lyra/weather/data-10k/80496d46-5fe4-44a2-b23b-73ccc80727c5.npz...
Copying gs://dcavazos-lyra/weather/data-10k/8061ce0a-993a-46a8-9fd0-c7cf04df2b9e.npz...
Copying gs://dcavazos-lyra/weather/data-10k/8063aa2c-8db1-4365-a517-995713bf2ec7.npz...
Copying gs://dcavazos-lyra/weather/data-10k/8064dd61-ee18-4482-a086-d3b41fb2c54b.npz...
Copying gs://dcavazos-lyra/weather/data-10k/8067a57f-6022-464e-9767-4d11c61f88ba.npz...
Copying gs://dcavazos-lyra/weather/data-10k/806d3d09-49c9-4843-bb98-c86baabd1216.npz...
Copying gs://dcavazos-lyra/weather/data-10k/808856db-7178-4fa4-9cf1-7d3c3c335b6b.npz...
Copying gs://dcavazos-lyra/weather/data-10k/806e1b44-30

In [61]:
from trainer.model import WeatherDataset

data_path = f"data/"

dataset = WeatherDataset(data_path)
(inputs_sample, labels_sample) = dataset[0]

print(f"The dataset contains {len(dataset)} examples.")
print(f"Inputs shape: {inputs_sample.shape}")
print(f"Labels shape: {labels_sample.shape}")

The dataset contains 9926 examples.
Inputs shape: torch.Size([52, 5, 5])
Labels shape: torch.Size([2, 5, 5])


Then we need to split the dataset into training and testing (validation) datasets.
We use the [`torch.utils.data.random_split`](https://pytorch.org/docs/stable/data.html#torch.utils.data.random_split) function to do this.
In [`trainer/model.py`](trainer/model.py) we defined `train_test_split`.

In [62]:
from trainer.model import train_test_split

(train_data, test_data) = train_test_split(dataset, ratio=0.9)
print(f"Train dataset contains {len(train_data)} examples")
print(f"Test dataset contains {len(test_data)} examples")

Train dataset contains 8933 examples
Test dataset contains 993 examples


> 💡 For more information on PyTorch datasets, refer to the [Datasets & Loaders](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html) guide.

## 🐣 Create the model

First we define our model, which is a very simple _Fully Convolutional Network_.
The input data can consist of potentially very large numbers, but machine learning generally prefers small numbers around -1 and 1.
So in [`trainer/model.py`](trainer/model.py) we defined a `Normalization` layer which finds the _mean_ and _standard deviation_ from the training dataset and applies [Z-Score](https://developers.google.com/machine-learning/data-prep/transform/normalization#z-score) to normalize all the model's inputs as a first step.

In [63]:
from trainer.model import Normalization

normalization = Normalization.adapt(train_data, axis=0)
print(f"mean: {normalization.mean.shape}")
print(f"std:  {normalization.std.shape}")

mean: torch.Size([52, 1, 1])
std:  torch.Size([52, 1, 1])


The model then passes the data through a
[2D Convolutional layer](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html) for downsampling, and then through a
[2D DeConvolutional layer](https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html) for upsampling, so we end up with images the same size as the input image.
We used a [`ReLU`](https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html) activation function inbetween all hidden layers.

For the last layer, we use a 2D Convolutional layer with kernel size `(1, 1)` to get one value for each output of the model.
This acts as a [`Linear`](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html) layer, but for a channels-first model.
Since we can't have negative precipitation, we passed the model's outputs through a final `ReLU` activation function.

In [`trainer/model.py`](trainer/model.py) we defined the `Model` class.
It takes the normalization layer as an input, and defines all the layers in our model architecture.
All the hyperparameters are constants to keep it simple, but feel free to play around with it.

In [64]:
import torch
import numpy as np

DEVICE = 'cpu'

class Model(torch.nn.Module):
    def __init__(self, normalization: Normalization) -> None:
        super().__init__()
        inputs = 52
        hidden1 = 128
        hidden2 = 64
        kernel_size = (3, 3)

        self.normalization = normalization
        self.fcn = torch.nn.Sequential(
            self.normalization,
            torch.nn.Conv2d(inputs, hidden1, kernel_size),
            torch.nn.ReLU(),
            torch.nn.ConvTranspose2d(hidden1, hidden2, kernel_size),
            torch.nn.ReLU(),
        )
        self.output1 = torch.nn.Sequential(
            torch.nn.Conv2d(hidden2, 1, (1, 1)),
            torch.nn.ReLU(),  # precipitation cannot be negative
        )
        self.output2 = torch.nn.Sequential(
            torch.nn.Conv2d(hidden2, 1, (1, 1)),
            torch.nn.ReLU(),  # precipitation cannot be negative
        )

    def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        y = self.fcn(x)
        return self.output1(y), self.output2(y)

    def predict(self, inputs: np.ndarray) -> np.ndarray:
        return self.predict_batch(np.stack([inputs]))[0]

    def predict_batch(self, inputs_batch: np.ndarray) -> np.ndarray:
        inputs_pt = torch.from_numpy(inputs_batch).moveaxis(-1, 1).to(DEVICE)
        with torch.no_grad():
            y1, y2 = self(inputs_pt)
            return torch.cat([y1, y2], dim=1).moveaxis(1, -1).cpu().numpy()

    def save(self, model_path: str) -> None:
        os.makedirs(model_path, exist_ok=True)
        torch.save(self.normalization.std, os.path.join(model_path, "std.pt"))
        torch.save(self.normalization.mean, os.path.join(model_path, "mean.pt"))
        torch.save(self.state_dict(), os.path.join(model_path, "state_dict.pt"))

    @staticmethod
    def load(model_path: str) -> Model:
        std = torch.load(os.path.join(model_path, "std.pt"), map_location=DEVICE)
        mean = torch.load(os.path.join(model_path, "mean.pt"), map_location=DEVICE)
        state_dict = torch.load(
            os.path.join(model_path, "state_dict.pt"), map_location=DEVICE
        )
        model = Model(Normalization(std, mean))
        model.load_state_dict(state_dict)
        model.eval()
        return model.to(DEVICE)

model = Model(normalization).to(DEVICE)
print(model)

Model(
  (normalization): Normalization()
  (fcn): Sequential(
    (0): Normalization()
    (1): Conv2d(52, 128, kernel_size=(3, 3), stride=(1, 1))
    (2): ReLU()
    (3): ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
  )
  (output1): Sequential(
    (0): Conv2d(64, 1, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
  )
  (output2): Sequential(
    (0): Conv2d(64, 1, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
  )
)


In [None]:
import torch
from trainer.model import Model

device = "cuda" if torch.cuda.is_available() else "cpu"

model = Model(normalization).to(device)
print(model)

Next we wrap our `Dataset`s into `DataLoader`s to create batches of training examples.
In [`trainer/model.py`](trainer/model.py) we defined the `data_loader` function which returns us a `DataLoader` for a given `Dataset`.

In [65]:
from trainer.model import data_loader

batch_size = 512
train_loader = data_loader(train_data, batch_size, shuffle=True)
test_loader = data_loader(test_data, batch_size)

for inputs_batch, labels_batch in train_loader:
  print(f"inputs_batch: {tuple(inputs_batch.shape)}")
  print(f"labels_batch: {tuple(labels_batch.shape)}")
  break

inputs_batch: (512, 52, 5, 5)
labels_batch: (512, 2, 5, 5)


> 💡 The `batch_size` can play a large role in how fast the data is fed to the model.
> As a rule of thumb, the larger it is the faster, but the batches should still fit into memory!

Finally, we can train the model.
We define the number of times we want to go through the training dataset as the number of _epochs_.

We also need a loss function, so the model knows how good or bad their predictions were.
We could use any regression loss function like [Mean Absolute Error (L1)](https://pytorch.org/docs/stable/generated/torch.nn.L1Loss.html) or [Mean Squared Error (L2)](https://pytorch.org/docs/stable/generated/torch.nn.MSELoss.html).
PyTorch provides a [Smooth L1 Loss](https://pytorch.org/docs/stable/generated/torch.nn.SmoothL1Loss.html), which chooses between L1 and L2 depending on a certain criteria.
It's less sensitive to outliers, so we'll use that.

In [`trainer/model.py`](trainer/model.py) we defined the `train` and `test` functions.
The `train` function goes through the entire training dataset, adjusts the model accordingly, and returns us the average batch loss.
The `test` function goes through the entire testing dataset and simply returns us the average batch loss, without modifying the model in any way.

In [66]:
import torch
from trainer.model import train, test

epochs = 1000
loss_fn = torch.nn.SmoothL1Loss()
for epoch in range(epochs):
    train_loss = train(model, train_loader, loss_fn)
    test_loss = test(model, test_loader, loss_fn)
    print(
        f"Epoch [{epoch + 1}/{epochs}] -- "
        f"train_loss: {train_loss:.4f} - "
        f"test_loss: {test_loss:.4f}"
    )

Epoch [1/1000] -- train_loss: 11.3744 - test_loss: 10.0454
Epoch [2/1000] -- train_loss: 10.1976 - test_loss: 9.8406
Epoch [3/1000] -- train_loss: 10.0216 - test_loss: 9.6820
Epoch [4/1000] -- train_loss: 9.9554 - test_loss: 9.6845
Epoch [5/1000] -- train_loss: 9.9343 - test_loss: 9.7103
Epoch [6/1000] -- train_loss: 9.8827 - test_loss: 9.6809
Epoch [7/1000] -- train_loss: 9.8489 - test_loss: 9.6405
Epoch [8/1000] -- train_loss: 9.8112 - test_loss: 9.5826
Epoch [9/1000] -- train_loss: 9.8016 - test_loss: 9.5527
Epoch [10/1000] -- train_loss: 9.7987 - test_loss: 9.5446
Epoch [11/1000] -- train_loss: 9.8120 - test_loss: 9.5266
Epoch [12/1000] -- train_loss: 9.7866 - test_loss: 9.5146
Epoch [13/1000] -- train_loss: 9.8035 - test_loss: 9.4987
Epoch [14/1000] -- train_loss: 9.7920 - test_loss: 9.4914
Epoch [15/1000] -- train_loss: 9.7918 - test_loss: 9.4933
Epoch [16/1000] -- train_loss: 9.7689 - test_loss: 9.4597
Epoch [17/1000] -- train_loss: 9.6579 - test_loss: 9.4076
Epoch [18/1000] -- 

> 💡 Both losses should go down every epoch, and they should be roughly similar.
> If the training loss goes down, but the testing loss stays flat or goes up, it might be a sign that the model is [overfitting](https://developers.google.com/machine-learning/crash-course/generalization/peril-of-overfitting), meaning that it's memorizing the training dataset instead of learning to generalize.

Now that we have a trained model, we can save it and load it anywhere else.
The `Model` class provides `save` and `load` methods to help us do all the steps in the right order.
We save the normalization `mean` and `std`, as well as the `state_dict` from the model.
To learn more about saving and loading models, see the [Saving and loading models](https://pytorch.org/tutorials/beginner/saving_loading_models.html) guide.

In [43]:
model.save("model-small/")

!ls -lh model

model = Model.load("model-small/")
print(model)

total 420K
-rw-r--r-- 1 root root  939 Dec  8 19:54 mean.pt
-rw-r--r-- 1 root root 410K Dec  8 19:54 state_dict.pt
-rw-r--r-- 1 root root  939 Dec  8 19:54 std.pt
Model(
  (normalization): Normalization()
  (fcn): Sequential(
    (0): Normalization()
    (1): Conv2d(52, 64, kernel_size=(3, 3), stride=(1, 1))
    (2): ReLU()
    (3): ConvTranspose2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
  )
  (output1): Sequential(
    (0): Conv2d(128, 1, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
  )
  (output2): Sequential(
    (0): Conv2d(128, 1, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
  )
)


# ☁️ Train the model in Vertex AI

For this example we're training on a very small dataset for a very small number of epochs.
This means we don't have a representative number of examples and the model hasn't seen the data enough times, so it won't perform very well.

Training on larger datasets for a large number of epochs can take a lot of time, so it might be a good idea to do the training in Cloud.
[Vertex AI](https://cloud.google.com/vertex-ai) is a great option, and even allows us to use hardware accelerators like GPUs.
There are [PyTorch pre-built containers](https://cloud.google.com/vertex-ai/docs/training/pre-built-containers#pytorch) which include PyTorch and many common libraries, so we don't need to build a custom container.

In [None]:
# from google.cloud import aiplatform

# epochs = 100000
# # batch_size = 128

# aiplatform.init(project=project, location=location, staging_bucket=bucket)

# job = aiplatform.CustomTrainingJob(
#     display_name=f"weather-10k-{epochs}",
#     script_path="trainer/model.py",
#     container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-11:latest",
# )

# job.run(
#     machine_type='n1-highmem-8',
#     accelerator_type='NVIDIA_TESLA_T4',
#     accelerator_count=1,
#     args=[
#         f"--data-path=/gcs/{bucket}/weather/data-10k",
#         f"--model-path=/gcs/{bucket}/weather/model-10k-{epochs}",
#         f"--epochs={epochs}",
#         # f"--batch-size={batch_size}",
#     ],
# )

In [None]:
from google.cloud import aiplatform

epochs = 100000

aiplatform.init(project=project, location=location, staging_bucket=bucket)

job = aiplatform.CustomTrainingJob(
    display_name="weather",
    script_path="trainer/model.py",
    container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-11:latest",
)

job.run(
    machine_type='n1-highmem-8',
    accelerator_type='NVIDIA_TESLA_T4',
    accelerator_count=1,
    args=[
        f"--data-path=/gcs/{bucket}/weather/data",
        f"--model-path=/gcs/{bucket}/weather/model",
        f"--epochs={epochs}",
    ],
)

> 💡 Look at your Vertex AI training jobs: https://console.cloud.google.com/vertex-ai/training/custom-jobs

# 🔮 Make predictions

In [None]:
# # Setup some test data to predict
# test_data = [
#     ['2021-12-23T17:00:00', -118.04194946289063, 40.788608828924076, 'light rain'],
#     ['2021-12-15T17:30:00', -76.23759765625002, 43.16430423079589, 'medium rain'],
#     ['2021-12-23T17:00:00', -119.66792602539063, 36.882018532995076, 'heavy rain'],
#     ['2021-12-23T17:00:00', -112.0310546875, 36.27880268550452, 'just cloudy'],
#     ['2021-09-23T17:00:00', -119.98515625, 39.68941717505306, 'clear'],
#     ['2021-11-15T17:30:00', -123.72188110351563, 45.6445363447774, 'medium rain'],
#     ['2021-12-23T17:00:00', -115.70048828125, 35.324365287778654, 'light rain']
# ]

Now we are ready to run the model on some new data and get some predictions.

First, we get the input data for the model.
We get the labels as well, just to compare our model's predictions with what the real precipitation actually was.

In [5]:
from datetime import datetime
from serving.data import get_inputs_patch, get_labels_patch

date = datetime(2019, 9, 2, 18)
point = [-78.322, 25.507]  # [longitude, latitude]
patch_size = 128

inputs = get_inputs_patch(date, point, patch_size)
labels = get_labels_patch(date, point, patch_size)

print(f"inputs : {inputs.dtype} {inputs.shape}")
print(f"labels : {labels.dtype} {labels.shape}")

inputs : float32 (128, 128, 52)
labels : float32 (128, 128, 2)


Here's how the input data looks like.

In [6]:
from visualize import show_inputs

show_inputs(inputs)

## 🛑 Update how many epochs and dataset size used for pre-trained model

Next, we load our model.
If we want to use the model we trained in Vertex AI, we have to copy it and have it locally.
Otherwise, we've provided a pre-trained model in the [`model/`](model/) directory.
We trained it for a large number on epochs on a really large dataset, so you don't have to 🙂.

> 💡 Uncomment and run the following cell to use the model you trained in Vertex AI.

In [36]:
# !mkdir -p model-vertex
# !gsutil cp gs://{bucket}/weather/model/* model-vertex/

Copying gs://dcavazos-lyra/weather/model/mean.pt...
Copying gs://dcavazos-lyra/weather/model/state_dict.pt...
Copying gs://dcavazos-lyra/weather/model/std.pt...
/ [3 files][411.2 KiB/411.2 KiB]                                                
Operation completed over 3 objects/411.2 KiB.                                    


In [3]:
!rm -rf model
!mkdir model
!gsutil cp gs://{bucket}/weather/model-10k-100000/* model/

Copying gs://dcavazos-lyra/weather/model-10k-100000/mean.pt...
Copying gs://dcavazos-lyra/weather/model-10k-100000/state_dict.pt...
Copying gs://dcavazos-lyra/weather/model-10k-100000/std.pt...
/ [3 files][411.2 KiB/411.2 KiB]                                                
Operation completed over 3 objects/411.2 KiB.                                    


In [4]:
from trainer.model import Model

model = Model.load("model")
# model = Model.load("model-vertex")  # uncomment to use the Vertex AI model

print(model)

Model(
  (normalization): Normalization()
  (fcn): Sequential(
    (0): Normalization()
    (1): Conv2d(52, 64, kernel_size=(3, 3), stride=(1, 1))
    (2): ReLU()
    (3): ConvTranspose2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
  )
  (output1): Sequential(
    (0): Conv2d(128, 1, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
  )
  (output2): Sequential(
    (0): Conv2d(128, 1, kernel_size=(1, 1), stride=(1, 1))
    (1): ReLU()
  )
)


In [8]:
from visualize import show_outputs

predictions = model.predict(inputs)

show_outputs(labels)
show_outputs(predictions)

# 5. ⛵ Further exploration

This notebook demonstrated a simple model to start exploring the problem of weather forecasting using deep neural networks. The model has less than 100k parameters and only a few Conv2D layers to keep training time short. Even so, the model is able to distinguish cloud patterns for broad rain vs no rain detection.

There has been a lot of interesting research work on weather nowcasting recently, especially with [U-Net](https://en.wikipedia.org/wiki/U-Net) style model architectures. If you are interested in diving deeper, here are some articles from Google Research:

*  [Google Research blog on nowcasting](https://ai.googleblog.com/2021/11/metnet-2-deep-learning-for-12-hour.html)
*  [MetNet paper](https://arxiv.org/abs/2003.12140)

# 6. 🧹 Clean Up

To **avoid incurring charges** to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

## Deleting the project

The **easiest** way to **eliminate billing** is to delete the project that you created for the tutorial.

To delete the project:

> ⚠️ Deleting a project has the following effects:
>
> * **Everything in the project is deleted.** If you used an existing project for this tutorial, when you delete it, you also delete any other work you've done in the project.
>
> * **Custom project IDs are lost.** When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as an appspot.com URL, delete selected resources inside the project instead of deleting the whole project.
>
> If you plan to explore multiple tutorials and quickstarts, **reusing** projects can help you avoid exceeding project **quota limits**.

1. In the Cloud Console, go to the **Manage resources** page.

  <button>

  [Go to Manage resources](https://console.cloud.google.com/iam-admin/projects)

  </button>

1. In the project list, select the project that you want to delete, and then click **Delete**.

1. In the dialog, type the project ID, and then click **Shut down** to delete the project.