# LANDSAT and Ensemble Learning Models

[Ensemble Learning Models (Elm)](https://github.com/ContinuumIO/elm) was developed for a 2016 NASA SBIR Phase I.  Elm provides large data machine learning tools for satellite imagery and climate data.

 * Using the AWS S3 LANDSAT data
 * Using GeoTiff metadata
 * Feature engineering with `elm.pipeline.Pipeline`
 * Fitting / predicting with `distributed`

```
conda env create --name ds-35
source activate ds-35
conda install -c elm/label/dev -c elm -c conda-forge -c ioam -c conda-forge -c scitools/label/dev python=3.5 elm earthio pyarrow fastparquet
conda remove bokeh ; conda install bokeh
jupyter notebook
```

In [None]:
%matplotlib inline
import glob
import os
import re
import sys
from urllib.request import urlopen

from bokeh.models import WMTSTileSource
from cartopy import crs
from collections import defaultdict, OrderedDict
from dask.diagnostics import ProgressBar
from dask.distributed import Client
from earthio import load_array, load_tif_meta, BandSpec, ElmStore
from earthio.landsat_util import landsat_metadata
from earthio.s3_landsat_util import SceneDownloader
from elm.model_selection.kmeans import kmeans_aic, kmeans_model_averaging
from elm.pipeline import Pipeline, steps
from holoviews.operation import decimate
from holoviews.operation.datashader import aggregate, shade, datashade, dynspread
from matplotlib.cm import get_cmap
from pyproj import Proj, transform
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import PCA
import dask
import dask.dataframe as dd
import datashader as ds
import datashader.transfer_functions as tf
import dill
import geoviews as gv
import holoviews as hv
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio as rio
import requests
import xarray as xr

hv.notebook_extension('bokeh')
decimate.max_samples = 1000
dynspread.max_px = 20
dynspread.threshold = 0.5

## S3 LANDSAT downloader
See [this example scene from the AWS S3 LANDSAT store](http://landsat-pds.s3.amazonaws.com/L8/015/033/LC80150332013207LGN00/index.html)

This example uses `SceneDownloader` to find scenes meeting spatial or cloud cover criteria.

In [None]:
s3_download = SceneDownloader(s3_tif_dir='data')

In [None]:
?SceneDownloader

## GeoTiff options

Use `elm.readers.BandSpec` to control:

 * Resolution
 * Naming of the bands
 * Where to find each band's GeoTiff based on file name match

In [None]:
BUF_X_SIZE, BUF_Y_SIZE = 600, 600 # Set to 800, 800 for 800 by 800 pix decimation
BAND_SPECS = [BandSpec(search_key='name',
                       search_value='B{}.TIF'.format(band),
                       name='band_{}'.format(band),
                       buf_xsize=BUF_X_SIZE,
                       buf_ysize=BUF_Y_SIZE) for band in range(1, 8)]

## Create `distributed.Client`

 * Defaults to creation of local scheduler / workers
 * Can point to remote scheduler / workers

In [None]:
scheduler = os.environ.get('DASK_SCHEDULER', '172.31.98.124:8786')
client = Client(scheduler)

In [None]:
import dask.dataframe as dd
cens = dd.io.parquet.read_parquet('data/census.snappy.parq', )
cens = cens.persist()
cens.columns, cens[['easting', 'northing']].min().compute()

In [None]:
FT_2_M = 0.3048    
def convert_coords(X, y=None, sample_weight=None, metric=True, **kwargs):
    landsat = Proj(**X.band_1.meta['crs'])  
    web_mercator = Proj(init='epsg:3857')             # Mercator projection EPSG code
    scale = 1.0 if metric else FT_2_M
    xx, yy = transform(landsat, web_mercator, X.band_1.x.values * scale, X.band_1.y.values * scale)
    for band in X.band_order:
        b = getattr(X, band)
        b.x.values[:] = xx
        b.y.values[:] = yy
    return (X, y, sample_weight)
convert_coords_step = steps.ModifySample(convert_coords)

## Get corresponding population

In [None]:
cens = dd.io.parquet.read_parquet('data/census.snappy.parq')
cens = cens.persist()

In [None]:
def binning_population(X, y=None, sample_weight=None, **kwargs):
    xx, yy = X.band_1.x.values, X.band_1.y.values
    subset = cens[(cens.easting <= np.max(xx))&
              (cens.easting >= np.min(xx))& 
              (cens.northing <= np.max(yy))&
              (cens.northing >= np.min(yy))]
    people_counts = None
    X_resamp = {}
    h, w = X.band_1.shape
    for b in range(1, 8):
        band = 'band_' + str(b)
        band_existing = getattr(X, band)
        img = hv.Image(band_existing, vdims=[band])
        if people_counts is None:
            people_counts = aggregate(hv.Points(subset), x_range=img.range(0), y_range=img.range(1), width=w, height=h, dynamic=False)
        aggregate(img, aggregator=ds.mean(band), width=w, height=h, dynamic=False)
        band_resamp = aggregate(img, aggregator=ds.mean(band), width=w, height=h, dynamic=False)
        X_resamp[band] = getattr(band_resamp.data, band)
    X_resamp = xr.Dataset(X_resamp, attrs=X.attrs)
    y = people_counts.data.Count.values.ravel()
    return (X_resamp, y, None)

bin_pop = steps.ModifySample(binning_population)

## Finding a cloud free image

(For a given LANDSAT row / path and month)

In [None]:
clear_image = s3_download.lowest_cloud_cover_image(row=33, path=15, months=tuple(range(1,13)))
clear_image

In [None]:
download_url = clear_image.download_url.values[0]
download_url

## LANDSAT `sampler` function
 * Uses `elm.readers.load_array` with `band_specs` argument
 * Adds MTL file metadata with `elm.readers.landsat_util.landsat_metadata`

In [None]:
def sampler(download_url, **kwargs):
    local_files = s3_download.download_all_bands(download_url)
    this_sample_dir = os.path.dirname(local_files[0])
    X = load_array(this_sample_dir, band_specs=BAND_SPECS)
    X.attrs.update(vars(landsat_metadata([f for f in local_files if f.endswith('.txt')][0])))
    y = sample_weight = None
    return (X, y, sample_weight)

## Convert digital numbers to radiance or reflectance

Generalize the example given in the plot above to allow TOA radiance or reflectance for any band:

In [None]:
from functools import partial
def toa_rad_or_reflect(X, y=None, sample_weight=None,**kw):
    rad_or_reflect = kw['rad_or_reflect']
    for band in X.data_vars:
        num = band.split('_')[-1]
        add = getattr(X, '{}_ADD_BAND_{}'.format(rad_or_reflect, num))
        mult = getattr(X, '{}_MULT_BAND_{}'.format(rad_or_reflect, num))
        band_arr = getattr(X, band)
        band_arr.values[:] = band_arr.values * mult + add
        if rad_or_reflect == 'REFLECTANCE':
            band_arr.values = band_arr.values / np.sin(X.SUN_ELEVATION * (np.pi / 180.))
    return (X, y, sample_weight)
toa_radiance = partial(toa_rad_or_reflect, rad_or_reflect='RADIANCE')
toa_reflectance = partial(toa_rad_or_reflect, rad_or_reflect='REFLECTANCE')

## Set `NaN` values for no-data regions

In [None]:
def set_nans(X, y=None, sample_weight=None, **kwargs):
    xx = X.copy(deep=True)
    for band in xx.data_vars:
        band_arr = getattr(xx, band)
        band_arr.values = band_arr.values.astype(np.float32)
        band_arr.values[band_arr.values <= 1] = np.NaN
        band_arr.values[band_arr.values == 2**16] = np.NaN
    return (xx, y, sample_weight)

## `elm.pipeline.steps.ModifySample`
 * Use custom functions in an `elm.pipeline.Pipeline` of transformations

In [None]:
set_nans_step = steps.ModifySample(set_nans)
reflectance_step = steps.ModifySample(toa_reflectance)

## Normalized differences between bands

Normalized differences between band reflectances may be helpful in feature engineering to differentiate water, urban areas and forests.

 * NDWI - Normalized Difference Water Index
 * NDVI - Normalized Difference Vegetation Index
 * NDSI - Normalized Difference Soil Index
 * NBR - Normalized Burn Ratio

In [None]:
normalized_diffs = {'ndwi': ('band_4', 'band_5'),
                    'ndvi': ('band_5', 'band_4'),
                    'ndsi': ('band_2', 'band_6'),
                    'nbr':  ('band_4', 'band_7'),
                 }
normed_diffs_step = steps.NormedBandsDiff(spec=normalized_diffs)

In [None]:
ProgressBar().register()

hv.notebook_extension('bokeh', width=95)

%opts Overlay [width=800 height=455 xaxis=None yaxis=None show_grid=False] 
%opts RGB     [width=800 height=455 xaxis=None yaxis=None show_grid=False] 
%opts Shape (fill_color=None line_width=1.5) [apply_ranges=False] 
%opts Points [apply_ranges=False] WMTS (alpha=0.5) NdOverlay [tools=['tap']]
color_key = {'w':'blue',  'b':'green', 'a':'red',   'h':'orange',   'o':'saddlebrown'}
races     = {'w':'White', 'b':'Black', 'a':'Asian', 'h':'Hispanic', 'o':'Other'}

color_points = hv.NdOverlay({races[k]: gv.Points([0,0], crs=crs.PlateCarree(),
                                 label=races[k])(style=dict(color=v))
                             for k, v in color_key.items()})

## Selecting bands for learning
The following function could allow hyperparameterization to control which bands and normalized differences become input features to machine learning.

In [None]:
NORMALIZED_DIFFS = ('nbr', 'ndsi', 'ndwi', 'ndvi')
DEFAULT_BANDS = [band_spec.name for band_spec in BAND_SPECS]
def choose_bands(X, y=None, sample_weight=None, **kwargs):
    new = {}
    bands = kwargs.get('bands', DEFAULT_BANDS)
    include_normed_diffs = kwargs.get('include_normed_diffs', True)
    for band in bands:
        data_arr = getattr(X, band)
        new[band] = data_arr
    if include_normed_diffs:
        for diff in NORMALIZED_DIFFS:
            new[diff] = getattr(X, diff)
    ks = list(new)
    es = ElmStore({k: new[k] for k in ks}, add_canvas=False)
    for band in es.data_vars:
        es[band].attrs['canvas'] = data_arr.canvas
    es.attrs.update(X.attrs)
    print('Chose', es.data_vars)
    return (es, y, sample_weight)

## Using `elm.pipeline.steps` for preprocessing
The next cell allows a custom function to be used in a `Pipeline`:

In [None]:
choose_bands_step = steps.ModifySample(choose_bands,
                              bands=DEFAULT_BANDS,
                              include_normed_diffs=True)

These steps flatten rasters to columns and remove no-data pixels:

In [None]:
flat = steps.Flatten()
drop_na = steps.DropNaRows()

These steps using `sklearn.preprocessing.StandardScaler` to normalize data and `PCA` to reduce dimensionality.

In [None]:
standardize = steps.StandardScaler()
pca = steps.Transform(PCA(n_components=5))

In [None]:
download_url = clear_image.download_url.values[0]

X, y, _ = sampler(download_url)
assert y is None
Xnew, y, _ = convert_coords_step.fit_transform(X, y)
assert y is None
Xnew, y, _ = bin_pop.fit_transform(Xnew, y)
assert y is not None
Xnew, y, _ = set_nans_step.fit_transform(Xnew, y)
assert y is not None
assert y.size == Xnew.band_1.values.size
Xnew, y, _ = reflectance_step.fit_transform(Xnew, y)
assert y is not None
Xnew, y, _ = normed_diffs_step.fit_transform(Xnew, y)
assert y is not None
Xnew, y, _ = choose_bands_step.fit_transform(Xnew, y)
assert y is not None
Xnew, y, _ = flat.fit_transform(Xnew, y)
assert y is not None
assert y.size == Xnew.flat.values.shape[0]
Xnew, y, _ = drop_na.fit_transform(Xnew, y)
assert y.size == Xnew.flat.values.shape[0] # TODO these assertions would be a good unit test


In [None]:
Xnew.flat.shape, y.shape

In [None]:
dataset = gv.Dataset(cens, kdims=['easting', 'northing'], vdims=['race'])

xx, yy = X.band_1.x.values, X.band_1.y.values
#x_range, y_range = ((-13884029.0, -7453303.5), (2818291.5, 6335972.0)) # Continental USA
x_range, y_range = ((np.min(xx), np.max(xx)), (np.min(yy), np.max(yy))) # Chesapeake Bay region LANDSAT 15 / 033
shade_defaults = dict(x_range=x_range, y_range=y_range, x_sampling=10, y_sampling=10, width=800, height=455)

shaded = datashade(hv.Points(dataset),  cmap=color_key, aggregator=ds.count_cat('race'), **shade_defaults)
shaded

##  `scikit-learn` estimator

The final step in `Pipeline` is a `scikit-learn` estimator.

## Controlling ensemble initialization

Starting with a group of `8` `Pipeline` instances with varying PCA and K-Means parameters.

In [None]:
from sklearn.linear_model import LinearRegression
reg = LinearRegression()
def scoring(model, X, y=None, sample_weight=None, **kwargs):
    return model._estimator.score(X, y)
reg_pipe = Pipeline([
                 ('convert_coords', convert_coords_step),
                 ('set_nans', set_nans_step),
                 ('population', bin_pop),
                 ('reflect', reflectance_step),
                 ('normed_diffs', normed_diffs_step),
                 ('choose', choose_bands_step),
                 ('flat', flat),
                 ('drop_na', drop_na),
                 ('standard', standardize),
                 ('pca', pca),
                 ('est', reg)],
                scoring=scoring,
                scoring_kwargs=dict(score_weights=[1]))

## Run `fit_ensemble`
 * Control number of fitting generations
 * Control model selection
 * Control ensemble initialization

In [None]:
client

In [None]:
X, _, _ = sampler(download_url)
fitted = reg_pipe.fit_ensemble(X=X, client=client)

## `Pipeline.predict_many`
 * Predicts for one or more samples and one or more ensemble members
 * Uses `distributed` for parallelism
 * Can return xarray data structure or serialize it
 * By default, reshapes 1-D predictions to 2-D spatial arrays

In [None]:
preds = reg_pipe.predict_many(X=X, client=client)

## Next Steps - Hierarchical Modeling

Notice in the predictions plotted above, most ensemble members arrived at similar clustering systems, but:

* The clusters were named differently in each model (i.e. cluster #1 is not the same in every ensemble member).
* The models differed in the water region of the image (Chesapeake Bay) with some models finding two in-water clusters and other models finding one

Future development with `elm` will automate the following cells' steps of predicting based on an ensemble of predictions.  The steps are to:

* Flatten all predictions
* Use a categorical to binary encoder
* Predict with K-Means based on the ensemble members' encoded predictions

In [None]:
from sklearn.preprocessing import OneHotEncoder
def sampler_layer_2(preds):
    # This will be simplified in Hierarchical modeling / vote count tasks
    predicts = []
    for p in preds:
        flat, _, _ = steps.Flatten().fit_transform(p.copy(deep=True))
        no_na, _, _ = steps.DropNaRows().fit_transform(flat)
        predicts.append(no_na.flat.values[:,0])
    transformed = OneHotEncoder().fit_transform(np.array(predicts).T).todense()
    Xnew = ElmStore({'flat': xr.DataArray(transformed, 
                                          coords=[('space', no_na.space), 
                                                  ('band', np.arange(transformed.shape[1]))],
                                         dims=('space','band'))},
                    attrs=no_na.attrs)
    return Xnew
X_layer_2 = sampler_layer_2(preds)

## Pick a number of clusters to use (randomly)

## Fit and predict based on ensemble of predictions

In [None]:
pipe_level_2.fit_ensemble(X=X_layer_2, ngen=1, init_ensemble_size=1)
preds2 = pipe_level_2.predict_many(X=X_layer_2, y=y)
len(preds2)

## Plot prediction from hierarchical model

This shows some of the Phase II idea of hierarchical models (models on predictions from ensembles).

In [None]:
# TODO Legend
%opts Image [width=800 height=600]
%opts Layout [tabs=True]
best = preds2[0]
hv.Image(best, kdims=['x', 'y'])

In [None]:
#img = hv.Image(X.band_1)
#agg = aggregate(hv.Points(subset), target=img, dynamic=False)

#ds = xr.Dataset({'Population': agg.data.Count, 'Band_1': X.band_1})
#df = ds.to_dataframe()
#ds = xr.Dataset({'Population': agg.data.Count, 'Band_1': agg2.data.band_1})

In [None]:
X.band_order, X_resamp.band_order

In [None]:
X_resamp.band_1

In [None]:
population_y = people_counts.data.Count.values

In [None]:
population_y.shape

In [None]:
download_url