<a target="_blank" href="https://colab.research.google.com/github/Prindle19/efcoa/blob/main/notebooks/Sentinel_2_Classification_Dask_ML_KNN_on_GKE.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

### Colab-only

Install kubectl and Helm

In [None]:
%cd /tools
!curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-linux-x86_64.tar.gz > /dev/null 2>&1
!tar -xf google-cloud-cli-linux-x86_64.tar.gz
!gcloud components install -q gke-gcloud-auth-plugin > /dev/null 2>&1
!gcloud components install -q kubectl > /dev/null 2>&1
!curl https://baltocdn.com/helm/signing.asc | gpg --dearmor | sudo tee /usr/share/keyrings/helm.gpg> /dev/null
!sudo apt-get install apt-transport-https --yes > /dev/null 2>&1
!echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/helm.gpg] https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list
!sudo apt-get update > /dev/null 2>&1
!sudo apt-get install helm > /dev/null 2>&1
%cd /content

In [None]:
# Colab-only
from google.colab import auth
auth.authenticate_user()

In [None]:
# Make sure we have a proper version of Dask
!pip install -q --upgrade "dask[complete]==2024.1.0"

In [None]:
# Make sure version printed below is 2024.1.0, if not - restart this session before running next cells
import dask
dask.__version__

In [None]:
# Set Project, Zone, and Cluster variables
PROJECT="my-project"
ZONE="us-east4-c"
CLUSTER='embeddings-cluster'

In [None]:
# Set the local machine's gcloud project

!gcloud config set project $PROJECT

#### 1. Create a new GKE cluster

In [None]:
%%time

# Create cluster, this takes ~5 min
!gcloud container clusters create $CLUSTER \
    --zone=$ZONE \
    --num-nodes=2 \
    --machine-type=c2-standard-8

Inspect your cluster in GKE: https://console.cloud.google.com/kubernetes




#### 2. Deploy Dask to our cluster using Helm

In [None]:
# Save cluster credentials in ~/.kube/config so that local kubectl and helm commands can use it

!gcloud container clusters get-credentials $CLUSTER --project $PROJECT --zone $ZONE

In [None]:
# Use kubectl to show the nodes that have been deployed

!kubectl get nodes

In [None]:
# Install Dask on the cluster

!helm install --repo https://helm.dask.org \
    --set worker.replicas=16 \
    --set scheduler.serviceType=NodePort --set webUI.serviceType=NodePort --set jupyter.enabled=false \
    my-dask dask > /dev/null

In [None]:
# Run this command a few times to see the pod VMs are provisioned and in status "Running"

!kubectl get pods -o wide

In [None]:
# Show which services are running

!kubectl get services

#### 3. Forward ports for Dask Scheduler and Dask UI

In [None]:
# Run this cell, then open a terminal on the notebook and run the command printed below to forward Dask Scheduler port to localhost:8081

!echo kubectl port-forward $(kubectl get pod --selector="app=dask,component=scheduler,release=my-dask" --output jsonpath='{.items[0].metadata.name}') 8081:8786

In [None]:
# Run this cell, then open a local terminal where you can open a browser window to redirect Dask UI to the local machine port

# Query node name and port to forward for Dask UI
NODE_NAME=!kubectl get pod --selector="app=dask,component=scheduler,release=my-dask" --output jsonpath='{.items[0].spec.nodeName}'
NODE_NAME=NODE_NAME[0]
NODE_PORT=!kubectl get services --output jsonpath='{.items[1].spec.ports[1].nodePort}'
NODE_PORT=NODE_PORT[0]

# Run commands printed below in the local shell to forward Dask UI ports to localhost:8080, create and enter an ssh password when asked

!echo gcloud compute firewall-rules create allow-ssh-ingress-from-iap --direction=INGRESS --action=allow --rules=tcp:22 --source-ranges=35.235.240.0/20
!echo gcloud compute ssh --tunnel-through-iap $NODE_NAME -- -NL 8080:localhost:$NODE_PORT

#### 4. Test if Dask works

In [None]:
# Check if local browser can connect to cluster and run jobs

from dask.distributed import Client
client = Client("tcp://127.0.0.1:8081")
client

In [None]:
# Create a dummy Dask Array

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

In [None]:
# That array can be used like numpy, but run on Dask cluster

y = x + x.T
z = y[::2, 5000:].mean(axis=1)

z.compute() # observe parallel tasks in the Dask UI

Check https://docs.dask.org/en/latest/10-minutes-to-dask.html for an intro into Dask

### 5. KMeans example


In [None]:
# Install Zarr and other dependencies

!pip install -q "dask-ml[complete]" zarr xarray gcsfs pandas

In [None]:
# Make sure to install packages via EXTRA_PIP_PACKAGES or as below, it takes a minute or so to install it on all nodes
from dask.distributed import PipInstall
plugin = PipInstall(packages=["dask-ml[complete]", "gcsfs", "zarr", "xarray"], pip_options=["--upgrade"])
client.register_worker_plugin(plugin)

In [None]:
# Import Dask ML dependencies

import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In [None]:
# Create random data in the array which will be clustered

X, y = dask_ml.datasets.make_blobs(n_samples=100_000_000,
                                   chunks=10_000_000,
                                   random_state=0,
                                   centers=5)
X = X.persist()
X

In [None]:
# Run KNN on the Dask Cluster

km = dask_ml.cluster.KMeans(n_clusters=5, init_max_iter=2, oversampling_factor=10)
km.fit(X)

In [None]:
# Plot a sample (every 10,000 points) to demonstrate the clusters

fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000], cmap='viridis', alpha=0.25);

### 6. Read the Sentinel 2 10m Zarr and Classify on the GKE Dask Cluster


In [None]:
# Import dependencies and load Sentinel 2 10m Composite Zarr

import gcsfs
import xarray as xr
import zarr
from dask.distributed import Client, progress
import dask_ml.cluster
import matplotlib.pyplot as plt

fs = gcsfs.GCSFileSystem(token='anon', access='read_only')
gcsmap = gcsfs.mapping.GCSMap("gs://cloud-geo-efm-public/s2-composite-10m/", gcs=fs, check=False, create=False)

# Read the dataset
ds = xr.open_zarr(gcsmap)
ds

In [None]:
# Crop the Dataset to the Manasquan, NJ Inlet area

bbox = [-74.09972442,  40.0838885 , -74.02481765,  40.12451048] # 500m buffer

ds_small = ds.sel(
    lat=slice(bbox[1], bbox[3]),
    lon=slice(bbox[0], bbox[2]),
    time='2022-01-01'
)

ds_small

In [None]:
# Convert the DataSet to an Array - using .persist() will ensure
# the results will be kept in distributed memory, rather than returned to the local process as with compute.

da = ds_small.persist().to_array()

# Stack lat and lon dimensions and transpose as KMeans expects input with a shape: (example, feature)
da = da.stack(point=['lat', 'lon']).transpose()
da

In [None]:
# Create 10 classes using Dask ML KNN on the GKE cluster using unsupervised classification

%%time
km = dask_ml.cluster.KMeans(n_clusters=10, init_max_iter=2, oversampling_factor=10)
km.fit(da)

In [None]:
da['predicted_class'] = ('point', km.labels_)

# Unstack the data to restore the original lat/lon dimensions
da = da.unstack('point')
da

In [None]:
# For reference, visualize Band 8 of the Sentinel 2 Composite

fig, ax = plt.subplots(figsize=(10, 5))
ds_small.B8.plot(x='lon',y='lat')

In [None]:
# Plot the predicted classes

fig, ax = plt.subplots(figsize=(10, 5))
da.predicted_class.plot(ax=ax, add_colorbar=True, x='lon', y='lat', cmap='tab10_r')
ax.set_title("K-Means clustering with Dask running locally stored in zarr")
plt.show()

#### 7. Clean up resources

In [None]:
# Uninstall Dask
!helm uninstall my-dask

In [None]:
# Delete the GKE Cluster
%%time

# ~3-4 min
!gcloud container clusters delete --quiet --zone=$ZONE $CLUSTER