In [None]:
# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the "License")

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# 🌦️ Weather forecasting -- _Dataset_

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/notebooks/2-dataset.ipynb)

This sample is broken into the following notebooks:

* [![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png) **🧭 Overview**](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/notebooks/1-overview.ipynb):
  Go through what we want to achieve, and explore the data we want to use as _inputs and outputs_ for our model.

* ![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png) **🗄️ Create the dataset**:
  Use [Apache Beam](https://beam.apache.org/) to fetch data from [Earth Engine](https://earthengine.google.com/) in parallel, and create a dataset for our model in [Dataflow](https://cloud.google.com/dataflow).

* [![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png) **🧠 Train the model**](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/notebooks/3-training.ipynb):
  Build a simple _Fully Convolutional Network_ in [PyTorch](https://pytorch.org/) and train it in [Vertex AI](https://cloud.google.com/vertex-ai/docs/training/custom-training) with the dataset we created.

* [![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png) **🔮 Model predictions**](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/notebooks/4-predictions.ipynb):
  Get predictions from the model with data it has never seen before.

This sample leverages geospatial satellite and precipitation data from [Google Earth Engine](https://earthengine.google.com/).
Using satellite imagery, you'll build and train a model for rain "nowcasting" i.e. predicting the amount of rainfall for a given geospatial region and time in the immediate future.

* ⏲️ **Time estimate**: ~30 minutes
* 💰 **Cost estimate**: [a few cents on Dataflow](https://cloud.google.com/dataflow/pricing)

💚 This is one of many **machine learning how-to samples** inspired from **real climate solutions** aired on the [People and Planet AI 🎥 series](https://www.youtube.com/playlist?list=PLIivdWyY5sqI-llB35Dcb187ZG155Rs_7).

# 🎬 Before you begin

Let's start by cloning the GitHub repository, and installing some dependencies.

In [None]:
# Now let's get the code from GitHub and navigate to the sample.
!git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
%cd python-docs-samples/people-and-planet-ai/weather-forecasting

The [`weather-data`](serving/weather-data) local package contains the functions to get data from Earth Engine.
It is used for both creating the training dataset, and for predictions.

In [None]:
# Upgrade `setuptools` to install packages from pyproject.toml files.
!pip install --quiet --upgrade --no-warn-conflicts pip setuptools

# We need `build` and `virtualenv` to build the local packages.
!pip install --quiet build virtualenv

# Install Apache Beam and the `weather-data` local package.
!pip install apache-beam[gcp] serving/weather-data

> **🛑 Restart the runtime 🛑**

Colab already comes with many dependencies pre-loaded.
In order to ensure everything runs as expected, we **_must_ restart the runtime**. This allows Colab to load the latest versions of the libraries.

!["Runtime" > "Restart runtime"](images/restart-runtime.png)

In [None]:
# Alternatively, restart the runtime by ending the process.
exit()

After restarting the runtime, let's navigate back into the sample directory.

In [None]:
%cd python-docs-samples/people-and-planet-ai/weather-forecasting

## ☁️ My Google Cloud resources

Make sure you have followed these steps to configure your Google Cloud project:

1. Enable the APIs: _Dataflow and Earth Engine_

  <button>

  [Click here to enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow.googleapis.com,earthengine.googleapis.com)
  </button>

1. Create or use an existing Cloud Storage bucket.

  <button>

  [Click here to create a new Cloud Storage bucket](https://console.cloud.google.com/storage/create-bucket)
  </button>

1. Register your
  [Compute Engine default service account](https://console.cloud.google.com/iam-admin/iam)
  on Earth Engine.

  <button>

  [Click here to register your service account on Earth Engine](https://signup.earthengine.google.com/#!/service_accounts)
  </button>

Once you have everything ready, you can go ahead and fill in your Google Cloud resources in the following code cell.
Make sure you run it!

In [None]:
from __future__ import annotations

import os
from google.colab import auth

# Please fill in these values.
project = ""  # @param {type:"string"}
bucket = ""  # @param {type:"string"}
location = "us-central1"  # @param {type:"string"}

# Quick input validations.
assert project, "⚠️ Please provide a Google Cloud project ID"
assert bucket, "⚠️ Please provide a Cloud Storage bucket name"
assert not bucket.startswith(
    "gs://"
), f"⚠️ Please remove the gs:// prefix from the bucket name: {bucket}"
assert location, "⚠️ Please provide a Google Cloud location"

# Authenticate to Colab.
auth.authenticate_user()

# Set GOOGLE_CLOUD_PROJECT for google.auth.default().
os.environ["GOOGLE_CLOUD_PROJECT"] = project

# Set the gcloud project for other gcloud commands.
!gcloud config set project {project}

# 🗄 Create the dataset locally

A dataset consists of _training examples_, which are `(inputs, labels)` pairs, so for each input data, we have to give it the correct output values.

We want a _balanced_ dataset consisting on a representative, diverse, and unbiased selection of data points.
This way the model can learn from many different examples covering different seasons, times of day, regions, ecosystems, etc.

Let's take a closer look at how we select our training examples to create the dataset.

## 📌 Sample points

First, we want to get balanced points for a given time.
We use [`ee.Image.stratifiedSample`](https://developers.google.com/earth-engine/apidocs/ee-image-stratifiedsample) to select around the same number of points for each amount of precipitation.
Also, most of the regions from where we're selecting data points fall under very low elevations, near sea level.
So it's important to make sure we select data points from different elevations in a balanced way.

Since the precipitation is a continuous value, we first need to convert it to a classification.
By looking at different images, we noticed that most values fall within 0 and 30.
So we simply clamped the values into that range, divided by the maximum value, multiplied by the number of bins, and converted them into integers.

We do a similar thing for the elevation, where we found empirically that most values fall between 0 and 6000.

Once we have bins for both precipitation and elevation, we combine them into a single "unique" bin value to make sure we get all the possible precipitation values for each elevation.

In [`create_dataset.py`](../create_dataset.py) we defined a function called `sample_points` that gives us a balanced selction of `(longitude, latitude)` coordinates for a given date.

In [None]:
from datetime import datetime
from create_dataset import sample_points

date = datetime(2019, 9, 2, 18)
for date, point in sample_points(date):
    print(f"{date} -- {point}")

2019-09-02 18:00:00 -- [-69.5525524841715, -39.82132539507417]
2019-09-02 18:00:00 -- [-71.4390145808225, 1.9503353164835744]
2019-09-02 18:00:00 -- [-52.12523597225278, -20.956704428564223]
2019-09-02 18:00:00 -- [-75.66109641618425, 34.11002248796244]
2019-09-02 18:00:00 -- [-37.662359897928496, 51.2678444146453]
2019-09-02 18:00:00 -- [-87.15953205291412, 5.902922566609462]
2019-09-02 18:00:00 -- [-70.27120471146712, 3.4774712994867656]
2019-09-02 18:00:00 -- [-45.208208284532475, -25.358449320749884]
2019-09-02 18:00:00 -- [-121.5650074346918, 8.058879248496325]
2019-09-02 18:00:00 -- [-127.7633828951165, 12.909781782741732]
2019-09-02 18:00:00 -- [-110.96488708208145, 53.96279026700387]
2019-09-02 18:00:00 -- [-50.957426102897415, -25.358449320749884]
2019-09-02 18:00:00 -- [-63.80333466580656, 2.399492958543334]
2019-09-02 18:00:00 -- [-50.957426102897415, -25.98727001963354]
2019-09-02 18:00:00 -- [-47.723491080067134, -22.034682769507654]
2019-09-02 18:00:00 -- [-71.61867763764

> 💡 We only bucketize the precipitation to select a balanced dataset, but we use the original continuous value for the labels.

## 📑 Get training examples

The next step is to get our training examples data.
Sometimes there are transient errors like sending too many requests, so we used [`Retry`](https://googleapis.dev/python/google-api-core/latest/retry.html) to handle those cases.

We predefined that all our training examples would be 5 pixels width by 5 pixels height, but we could choose any size as long as the model accepts it.
We also want all the training examples to be the same size so we can batch them.

In [`create_dataset.py`](../create_dataset.py) we defined `get_training_example`, which fetches an `(inputs, labels)` pair for the given date and (longitude, latitude) coordinate.
Let's see how a 64x64 patch looks like, since a 5x5 patch will only look like a bunch of random pixels to us.

In [None]:
from datetime import datetime
from create_dataset import get_training_example

date = datetime(2019, 9, 2, 18)
point = [-77.93, 25.23]  # [longitude, latitude]
(inputs, labels) = get_training_example(date, point, patch_size=64)

print(f"inputs : {inputs.dtype} {inputs.shape}")
print(f"labels : {labels.dtype} {labels.shape}")

inputs : float32 (64, 64, 52)
labels : float32 (64, 64, 2)


Let's see how the example inputs look like.

In [None]:
from visualize import show_inputs

show_inputs(inputs)

And these are the labels for that example, corresponding to 2 and 6 hours in the future from the example's time.

In [None]:
from visualize import show_outputs

show_outputs(labels)

> 💡 We chose _5x5 patches_ because our Fully Convolutional Model uses a _3x3 kernel_.
> We want a _balanced_ representation of precipitation, and we did the stratified sampling on the _center_ pixel only.
> By choosing 5x5 patches with a 3x3 kernel, we make sure the center pixel we chose appears in all 9 positions for the kernel.

## 📝 Write NumPy files

Finally, we need to write the training examples into files.
We chose [compressed NumPy files](https://numpy.org/doc/stable/reference/generated/numpy.savez_compressed.html) for simplicity.
We used Apache Beam [`FileSystems`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html) to be able to write into any file system that Beam supports, including Cloud Storage.

Before writing the examples, we batch them to create files containing multiple examples, rather than a single file per example.
This reduces I/O operations when reading the dataset during training.

Here, let's create a batch from a single example, but our data creation pipeline will create larger batches.

In [None]:
from create_dataset import write_npz

data_path = "data/"
batch = [(inputs, labels)]
write_npz(batch, data_path)

'data/c6482680-3d3f-43bd-bdff-e447f600f2b9.npz'

In [None]:
!ls -lh data

total 412K
-rw-r--r-- 1 root root 412K Jan 11 00:12 c6482680-3d3f-43bd-bdff-e447f600f2b9.npz


## 🗃 Create the dataset

Finally, we create an
[Apache Beam](https://beam.apache.org/) pipeline, which allows us to create parallel processing pipelines.
We can even save directly to [Cloud Storage](https://cloud.google.com/storage).

Let's see how to create a small dataset from a single date!

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

data_path = f"gs://{bucket}/weather/data-small"
dates = [datetime(2019, 9, 2, 18)]

beam_options = PipelineOptions([], direct_num_workers=20)
with beam.Pipeline(options=beam_options) as pipeline:
    (
        pipeline
        | "📆 Create dates" >> beam.Create(dates)
        | "📌 Sample points" >> beam.FlatMap(sample_points)
        | "🃏 Reshuffle" >> beam.Reshuffle()
        | "📑 Get example" >> beam.MapTuple(get_training_example)
        | "🗂️ Batch examples" >> beam.BatchElements()
        | "📝 Write NPZ files" >> beam.Map(write_npz, data_path)
    )

Now we can take a look at our data files.

In [None]:
!gsutil ls -lh gs://{bucket}/weather/data-small

# ☁️ Create the dataset in Dataflow

Local testing works great for creating small datasets and making sure everything works, but to run on a large dataset at scale it's best to use a distributed runner like
[Dataflow](https://cloud.google.com/dataflow).

We can run [`create_dataset.py`](../create_dataset.py) as a script and run it in [Dataflow](https://cloud.google.com/dataflow).
You can control the number of dates to sample with `--num-dates` _(default=100)_, and the number of bins to use for the stratified sampling with `--num-bins` _(default=10)_.

We are using the same data extraction functions for both training and prediction.
This means our Dataflow pipelines needs access to the [`serving/weather-data`](../serving/weather-data) module.
Since it's a local module that does not live in [PyPI](https://pypi.org), we have to first build the module with [`build`](https://pypa-build.readthedocs.io/en/latest) and then include the package for Dataflow.

In [None]:
# Build the `weather-data` package.
!python -m build serving/weather-data

In [None]:
!ls -lh serving/weather-data/dist

total 8.0K
-rw-r--r-- 1 root root 3.9K Jan 10 23:51 weather_data-1.0.0-py3-none-any.whl
-rw-r--r-- 1 root root 3.1K Jan 10 23:51 weather-data-1.0.0.tar.gz


In [None]:
data_path = f"gs://{bucket}/weather/data"

!python create_dataset.py \
  --data-path="{data_path}" \
  --runner="DataflowRunner" \
  --project="{project}" \
  --region="{location}" \
  --temp_location="gs://{bucket}/weather/temp" \
  --extra_package="./serving/weather-data/dist/weather-data-1.0.0.tar.gz"

> 💡 Look at your Dataflow jobs: https://console.cloud.google.com/dataflow/jobs

# ⛳️ What's next?

* [![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png) **🧠 Train the model**](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/notebooks/3-training.ipynb):
  Build a simple _Fully Convolutional Network_ in [PyTorch](https://pytorch.org/) and train it in [Vertex AI](https://cloud.google.com/vertex-ai/docs/training/custom-training) with the dataset we created.

* [![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png) **🔮 Model predictions**](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/people-and-planet-ai/weather-forecasting/notebooks/4-predictions.ipynb):
  Get predictions from the model with data it has never seen before.