In [0]:
#@title Authored by Michael Evans. { display-mode: "form" }
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Introduction

This is an Earth Engine <> TensorFlow notebook demonstrating workflows to train an automated land cover change prediction DNN model.  Specifically, this notebook shows:

1.   Ingesting previously exported csv files into TFRecord format.
2.   Preparing the data for use in a TensorFlow model.
2.   Training and validating a simple model (Keras `Sequential` neural network) in TensorFlow.
3.   Making predictions on image data exported from Earth Engine in TFRecord format.
4.   Ingesting classified image data to Earth Engine in TFRecord format.

# Setup

## Install the Earth Engine client library

This only needs to be done once per notebook.

In [0]:
!pip install earthengine-api

Authenticate to Earth Engine

In [0]:
import ee
ee.Authenticate()
ee.Initialize()
# Test the earthengine command by getting help on upload.
!earthengine upload image -h

## Google Authentication

To read/write from a Google Cloud Storage bucket to which you have access, it's necessary to authenticate (as yourself).  You'll also need to authenticate as yourself with Earth Engine, so that you'll have access to your scripts, assets, etc.

In [0]:
from google.colab import auth, drive

auth.authenticate_user()

Mount Google Drive

In [0]:
ROOT = ('/content/drive')
drive.mount(ROOT)

Make sure python can reference our module library

In [0]:
import sys
sys.path.append('/content/drive/My Drive/repos/ACD_methods/EEcode/Python')
sys.path

## Set up Tensorflow

The default public runtime already has the tensorflow libraries we need installed.  Before any operations from the TensorFlow API are used, import TensorFlow. Eager execution should be enabled by default as of TF v2.x[`tf.enable_eager_execution()` docs](https://www.tensorflow.org/api_docs/python/tf/enable_eager_execution). 

In [0]:
import tensorflow as tf

tf.executing_eagerly()
print(tf.__version__)

## Set up Folium

The default public runtime already has the Folium library we will use for visualization.  Import the library, check the version, and define the URL where Folium will look for Earth Engine generated map tiles.

In [0]:
import folium
print(folium.__version__)

# Define a method for displaying Earth Engine image tiles to a folium map.
def add_ee_layer(self, ee_image_object, vis_params, name):
  map_id_dict = ee.Image(ee_image_object).getMapId(vis_params)
  folium.raster_layers.TileLayer(
    tiles = map_id_dict['tile_fetcher'].url_format,
    attr = "Map Data © Google Earth Engine",
    name = name,
    overlay = True,
    control = True
  ).add_to(self)

# Add EE drawing method to folium.
folium.Map.add_ee_layer = add_ee_layer

# Get Training and Testing data from Earth Engine

We have previously exported csv files to Google Drive that contain per-pixel output from our change detection algorithms. This output consists of 6 metrics, and a field indicating whether that pixel is a true 'changed' pixel or not.

In [0]:
from os.path import join
DATA_DIR = 'My Drive/EarthEngine/ACD/IW/S2/S2_2'
# Number of records in dataset
SIZE = 4826575
BATCH = 10
NUMERIC_COLUMNS = ['cv_z', 'ndvi_z', 'nbr_z', 'ndwi_z', 'rcvmax_z', 'ndsi_z']
CATEGORY_COLUMNS = {'habitat':['shrub', 'forest', 'desert', 'grassland', 'wetland']}

FEATURE_COLUMNS = NUMERIC_COLUMNS + list(CATEGORY_COLUMNS.keys())
LABEL_COLUMN = 'disturbance'
LABELS = ['none', 'bare', 'residential', 'solar']
# LABEL_COLUMN = 'change'
# LABELS = [0, 1]
# factor for train/test split
FACTOR = 5
BUCKET = 'cvod-203614-mlengine'
PROJECT = 'ACD_methods'
PREDICT_DIR = 'data/predict'
LOG_DIR = 'drive/My Drive/Tensorflow/models/ACD_DNN'



In [0]:
def get_csv_dataset(file_path, batch, labels, features, **kwargs):
  """
  Construct a tfrecord dataset from a list of csv files
  Parameters:
    file_path (list<str>): string or list of strings specifying input files
    batch (int): batch size
    labels (str): field name containing labels
    features(list<str>): field name(s) containing feature values
  Returns:
    tf.data.Dataset
  """
  dataset = tf.data.experimental.make_csv_dataset(
      file_path,
      batch_size= batch,
      label_name= labels,
      select_columns = features + [labels],
      na_value="?",
      # one epoch of data initially because otherwise splitting runs infinitely
      num_epochs=1,
      ignore_errors=True,
      **kwargs)
  return dataset

In [0]:
# Specify the path to our csv data
csv_file_path = join(ROOT, DATA_DIR)

# Put all the csv files from our data directory into a list
rawData = tf.io.gfile.glob(csv_file_path + '/*.csv')

# Create a TFDataset
tfData = get_csv_dataset(rawData, BATCH, LABEL_COLUMN, FEATURE_COLUMNS)

# Inspect a batch of records
# # take creates a new dataset with n elements (batches)
# test = tfData.take(1)
# feats, labs = iter(test).next()
# # features are a dictionary of tensors
# print(feats)

In [0]:
def preprocess_record(features, labels):
  """
  Process input data by converting categorical features to lowercase and labels to one-hot
  Parameters:
    features (list<str>): column names of features to be used for prediction
    labels (str): column name containing labels
  Returns:
    tuple: dictionary of features and label tensor
  """
  # convert labels to one_hot tensor
  if labels.dtype.__eq__(tf.dtypes.string):
    # manually convert strings to one-hot tensor
    matches = tf.stack([tf.equal(labels, s) for s in LABELS], axis = -1)
    labels = tf.cast(matches, tf.float32)
  else:
    labels = tf.one_hot(labels, len(LABELS))
  # change all strings to lowercase
  features = {key:tf.strings.lower(feature) if key in CATEGORY_COLUMNS.keys() else feature for key, feature in features.items()}
  return features, labels

def train_test_split(dataset, factor):
  """
  Divide a tf.data.Dataset into train and test splits
  Parameters:
    dataset (tf.data.Dataset): dataset with features and labels to split
    factor (int): numerator for fraction of testing data (e.g. 5 = 1/5)
  Returns:
    tuple: two tf.data.Datasets
  """
  def is_test(x, y):
    return x % factor == 0

  def is_train(x, y):
    return not is_test(x, y)

  def recover(x,y):
    return y

  dataset = dataset.enumerate()
  test = dataset.filter(is_test).map(recover)
  train = dataset.filter(is_train).map(recover)
  return test, train



In [0]:
# process the data and shuffle once before splitting
tfData = tfData.map(preprocess_record).shuffle(SIZE, reshuffle_each_iteration = False)

# split into testing and training data
testData, trainData = train_test_split(tfData, FACTOR)

In [0]:
# Inspect our splits. These should be shuffled and batched
# take creates a new dataset with n elements (batches)
test = testData.take(1)
feats, labs = iter(test).next()
# features are a dictionary of tensors
print(labs)

In [0]:
print(feats['cv_z'].shape)

# Create the Keras model

Before we create the model, there's still a wee bit of pre-processing to get the data into the right input shape and a format that can be used with cross-entropy loss.  Specifically, Keras expects a list of inputs and a one-hot vector for the class. (See [the Keras loss function docs](https://keras.io/losses/), [the TensorFlow categorical identity docs](https://www.tensorflow.org/guide/feature_columns#categorical_identity_column) and [the `tf.one_hot` docs](https://www.tensorflow.org/api_docs/python/tf/one_hot) for details).  

Here we will use a simple neural network model with a 64 node hidden layer, a dropout layer and an output layer.  Once the dataset has been prepared, define the model, compile it, fit it to the training data.  See [the Keras `Sequential` model guide](https://keras.io/getting-started/sequential-model-guide/) for more details.

In [0]:
# Create feature columns for feature data as part of graph
numericColumns = [tf.feature_column.numeric_column(ft) for ft in NUMERIC_COLUMNS]
categoricalColumns = [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_vocabulary_list(key, vocab)) for key, vocab in CATEGORY_COLUMNS.items() if key == 'habitat']
preprocessing_layer = tf.keras.layers.DenseFeatures(categoricalColumns + numericColumns)


In [0]:
print(preprocessing_layer)

In [0]:
from tensorflow import keras

# Define the layers in the model.
model = tf.keras.models.Sequential([
  preprocessing_layer,                                 
  tf.keras.layers.Dense(64, input_shape = (7,), activation=tf.nn.relu),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(len(LABELS), activation=tf.nn.softmax)
])

# Define metrics we want to monitor
fp = tf.keras.metrics.FalsePositives()
fn = tf.keras.metrics.FalseNegatives()

tensorboard = tf.keras.callbacks.TensorBoard(log_dir = LOG_DIR)

checkpoint = tf.keras.callbacks.ModelCheckpoint(
    join(LOG_DIR,'best_weights.hdf5'),
    monitor='val_accuracy',
    verbose=1,
    save_best_only=True,
    mode='max'
    )

# Compile the model with the specified loss function.
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss='categorical_crossentropy',
              metrics=['accuracy', fp, fn])


In [0]:
# Fit the model to the training data.
# Don't forget to specify `steps_per_epoch` when calling `fit` on a dataset.
model.fit(
    # need to repeat our training data to cover multiple epochs
    x = trainData.repeat(),
    epochs = 5,
    steps_per_epoch = (SIZE*(FACTOR-1)/FACTOR)/BATCH,
    validation_data = testData,
    validation_steps = (SIZE/FACTOR)/BATCH,
    callbacks = [checkpoint, tensorboard]
    )

In [0]:
# Fit the model to initialize weights so we can restore from checkpoint
model.fit(
    # need to repeat our training data to cover multiple epochs
    x = trainData.repeat(),
    epochs = 1,
    steps_per_epoch = 10
    )

model.save(join(LOG_DIR, 'DNN_64.h5'))

In [0]:
#bring in the architecture and best weights from Drive
model = tf.keras.models.load_model(join(LOG_DIR, 'DNN_64.h5'))
# load pre-trained weights from best performing epoch
model.load_weights(join(LOG_DIR, 'best_weights.hdf5')) 

#lets see where were at
# evalMetrics = model.evaluate(x=testData, steps = (SIZE/FACTOR)/BATCH, verbose = 1)

## Check model accuracy on the test set

Now that we have a trained model, we can evaluate it using the test dataset.  To do that, read and prepare the test dataset in the same way as the training dataset.  Here we specify a batch sie of 1 so that each example in the test set is used exactly once to compute model accuracy.  For model steps, just specify a number larger than the test dataset size (ignore the warning).

# Prediction

Now that we have a trained model, let's make some predictions on images from GEE. First, we need to export an image to make predictions on

In [0]:
# Run the IW algorithm to generate an image with change metrics
import analyze, dictionaries

nlcd = ee.Image('USGS/NLCD/NLCD2016')

# give this aoi a name
testId = 'HughesMillCA'

# TODO: split up analyze functions so we don't need these
# grab the relevant dictionary of lda coefficients
dictionary = dictionaries.forest

aoi = ee.Geometry.Polygon(
        [[[-120.83127262496578, 39.10457008576222],
          [-120.83127262496578, 39.06952752960459],
          [-120.76518299483882, 39.06952752960459],
          [-120.76518299483882, 39.10457008576222]]], None, False);

doi = '2017-07-01' 

landcover = 'forest'

output = analyze.analyze_iw(
    ee.Feature(aoi, {'mode':landcover}),
     doi, dictionary, 0, testId)
iwImg = output[4]



In [0]:
# check the iwoutput on map
map = folium.Map(location=[39.08, -120.80])
map.add_ee_layer(iwImg, {'bands':['cv_z'], 'min':0, 'max': 50}, 'iwout')
map

## Export the imagery

You can also export imagery using TFRecord format.  Specifically, export whatever imagery you want to be classified by the trained model into the output Cloud Storage bucket.

In [0]:
def doExport(image, out_image_base, directory, region):
  """
  Export a GEE image as TFRecord.  Block until complete.
  Parameters:
    image (ee.Image): image to be exported
    out_image_base (str): output image base filename
    directory (str): google cloud directory for image export
    region (ee.Geometry): bounding area
  """
  # Specify patch and file dimensions.
  imageExportFormatOptions = {
    'patchDimensions': [256, 256],
    'maxFileSize': 104857600,
    'compressed': True
  }

  task = ee.batch.Export.image.toCloudStorage(
    image = image,
    description = out_image_base,
    fileNamePrefix = join(directory, out_image_base),
    bucket = BUCKET,
    scale = 10,
    fileFormat = 'TFRecord',
    region = region,
    formatOptions = imageExportFormatOptions,
  )
  task.start()

  # Block until the task completes.
  print('Running image export to Cloud Storage...')
  import time
  while task.active():
    time.sleep(30)

  # Error condition
  if task.status()['state'] != 'COMPLETED':
    print('Error with image export.')
  else:
    print('Image export completed.')

In [0]:
# Start the task.
doExport(iwImg, testId, join(PROJECT, PREDICT_DIR), aoi)

## Use the trained model to classify an image from Earth Engine

Now it's time to classify the image that was exported from Earth Engine.  If the exported image is large, it will be split into multiple TFRecord files in its destination folder.  There will also be a JSON sidecar file called "the mixer" that describes the format and georeferencing of the image.  Here we will find the image files and the mixer file, getting some info out of the mixer that will be useful during model inference.

Use `gsutil` to locate the files of interest in the output Cloud Storage bucket.  Check to make sure your image export task finished before running the following.

In [0]:
# Get a list of all the files in the output bucket.
def get_pred_files(imageBase, inDir):
  """
  Retrieve TFRecord image files and json mixer exported from GEE
  Parameters:
    imageBase (str): base filename for images to return
    inDir (str): directory containing image and mixer files
  Returns:
    tuple: list of image filenames, json
  """
  
  filesList = !gsutil ls {inDir}
  print(filesList)
  # Get only the files generated by the image export.
  exportFilesList = [s for s in filesList if imageBase in s]

  # Get the list of image files and the JSON mixer file.
  imageFilesList = []
  jsonFile = None
  for f in exportFilesList:
    if f.endswith('.tfrecord.gz'):
      imageFilesList.append(f)
    elif f.endswith('.json'):
      jsonFile = f

  # Make sure the files are in the right order.
  imageFilesList.sort()

  # pprint(imageFilesList)
  # print(jsonFile)

  import json
  # Load the contents of the mixer file to a JSON object.
  jsonText = !gsutil cat {jsonFile}
  # Get a single string w/ newlines from the IPython.utils.text.SList
  mixer = json.loads(jsonText.nlstr)
  print(mixer)
  return imageFilesList, mixer
  

## Read the image files into a dataset

You can feed the list of files (`imageFilesList`) directly to the `TFRecordDataset` constructor to make a combined dataset on which to perform inference.  The input needs to be preprocessed differently than the training and testing.  Mainly, this is because the pixels are written into records as patches, we need to read the patches in as one big tensor (one patch for each band), then flatten them into lots of little tensors.

In [0]:
import numpy as np

In [0]:
# Get relevant info from the JSON mixer file.
def make_pred_dataset(imageBase, inDir, features, habType):
  """
  This needs to return a tuple <dictionary of prediction features, blank 'labels'>
  """
  imageFiles, mixer = get_pred_files(imageBase, inDir)
  patch_width = mixer['patchDimensions'][0]
  patch_height = mixer['patchDimensions'][1]
  patches = mixer['totalPatches']
  patch_dimensions_flat = [patch_width * patch_height, 1]

  # get the index of the habitat column
  featList = features
  hab = featList.index('habitat')
  hab = featList.pop(hab)
  print(hab)
  # Note that the tensors are in the shape of a patch, one patch for each band.
  imageColumns = [
    tf.io.FixedLenFeature(shape=patch_dimensions_flat, dtype=tf.float32) 
      for k in featList
  ]

  # Parsing dictionary.
  imageFeaturesDict = dict(zip(featList, imageColumns))

  # Note that you can make one dataset from many files by specifying a list.
  imageDataset = tf.data.TFRecordDataset(imageFiles, compression_type='GZIP')

  # Parsing function.
  # Each element in the output is a dictionary of fixedlenfeatures of size (65536, 1)
  # There will be as many elements as patches
  def parse_image(example_proto):
    parsed = tf.io.parse_single_example(example_proto, imageFeaturesDict)
    # add habitat tensor to dictionary
    parsed.update({'habitat':np.full(patch_dimensions_flat, habType)})
    return parsed

  # Parse the data into tensors, one long tensor per patch.
  imageDataset = imageDataset.map(parse_image, num_parallel_calls=5)

  # Break our long tensors into many little ones. Only necessary if we want to do calculations with the data(?)
  # creates tensors for each feature of shape (1, )
  imageDataset = imageDataset.flat_map(
  # slice each tensor along first dimension
    lambda x: tf.data.Dataset.from_tensor_slices(x)
  )

  # # Add additional features (NDVI).
  # # imageDataset = imageDataset.map(
  # #   # Add NDVI to a feature that doesn't have a label.
  # #   lambda features: addNDVI(features, None)[0]
  # # )

  # Turn the dictionary in each record into a tuple with a dummy label.
  # imageDataset = imageDataset.map(
  #   # The model expects a tuple of (dictionary, label).
  #   # lambda dataDict: (dataDict, )
  #   # add dimension with 'list' then transpose from (6, 1) to (1, 6)
  #   # this operation destroys the dictionary
  #   lambda dataDict: (tf.transpose(list(dataDict.values())), )
  # )

  # Turn each patch into a batch.
  # This creates element tensors of shape (65536, 1, 6)
  imageDataset = imageDataset.batch(patch_width * patch_height)
  # imageDataset = imageDataset.batch(1)
  return imageDataset, patches

## Generate predictions for the image pixels

To get predictions in each pixel, run the image dataset through the trained model using `model.predict()`.  Print the first prediction to see that the output is a list of the three class probabilities for each pixel.  Running all predictions might take a while.

In [0]:
FEATURE_COLUMNS.append('habitat')
FEATURE_COLUMNS

In [0]:
# Run prediction in batches, with as many steps as there are patches.
def make_predictions(imgBase, inDir, model, features, habType):
  imageDataset, patches = make_pred_dataset(imgBase, inDir, features, habType) 
  predictions = model.predict(imageDataset, steps = patches, verbose = 1)
  return predictions

# Note that the predictions come as a numpy array.  Check the first one.
predImgDir = join('gs://', BUCKET, PROJECT, PREDICT_DIR)
predictions = make_predictions(testId, predImgDir, model, FEATURE_COLUMNS, 'forest')

## Write the predictions to a TFRecord file

Now that there's a list of class probabilities in `predictions`, it's time to write them back into a file, optionally including a class label which is simply the index of the maximum probability.  We'll write directly from TensorFlow to a file in the output Cloud Storage bucket.

Iterate over the list, compute class label and write the class and the probabilities in patches.  Specifically, we need to write the pixels into the file as patches in the same order they came out.  The records are written as serialized `tf.train.Example` protos.  This might take a while.

In [0]:
outputImageFile = join('gs://', BUCKET, PROJECT, PREDICT_DIR, 'output', testId + '.TFRecord')
outputImageFile
PATCH_WIDTH = 256
PATCH_HEIGHT = 256
PATCHES = 2

In [0]:
# Don't run
# Instantiate the writer.
writer = tf.io.TFRecordWriter(outputImageFile)

# Every patch-worth of predictions we'll dump an example into the output
# file with a single feature that holds our predictions. Since our predictions
# are already in the order of the exported data, the patches we create here
# will also be in the right order.
patch = [[], [], [], [], []]
curPatch = 1
for prediction in predictions:
  patch[0].append(tf.argmax(prediction, 0))
  patch[1].append(prediction[0])
  patch[2].append(prediction[1])
  patch[3].append(prediction[2])
  patch[4].append(prediction[3])
  # Once we've seen a patches-worth of class_ids...
  if (len(patch[0]) == PATCH_WIDTH * PATCH_HEIGHT):
    print('Done with patch ' + str(curPatch) + ' of ' + str(PATCHES) + '...')
    # Create an example
    # TODO: use dict comprehension to make this generalizeable based on labels
    example = tf.train.Example(
      features=tf.train.Features(
        feature={
          'prediction': tf.train.Feature(
              int64_list=tf.train.Int64List(
                  value=patch[0])),
          'noneProb': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=patch[1])),
          'bareProb': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=patch[2])),
          'resProb': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=patch[3])),
          'solarProb': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=patch[4]))
        }
      )
    )
    # Write the example to the file and clear our patch array so it's ready for
    # another batch of class ids
    writer.write(example.SerializeToString())
    patch = [[], [], [], [], []]
    curPatch += 1

writer.close()

# Upload the classifications to an Earth Engine asset

## Verify the existence of the predictions file

At this stage, there should be a predictions TFRecord file sitting in the output Cloud Storage bucket.  Use the `gsutil` command to verify that the predictions image (and associated mixer JSON) exist and have non-zero size.

In [0]:
!gsutil ls -l {outputImageFile}

## Upload the classified image to Earth Engine

Upload the image to Earth Engine directly from the Cloud Storage bucket with the [`earthengine` command](https://developers.google.com/earth-engine/command_line#upload).  Provide both the image TFRecord file and the JSON file as arguments to `earthengine upload`.  (You can use the `nclinton` version for now.)

In [0]:
!gsutil ls gs://cvod-203614-mlengine/ACD_methods/data/predict/*.json

In [0]:
USER_NAME = 'defendersofwildlifeGIS'
outputAssetID = 'users/' + USER_NAME + '/' + testId
jsonFile = 'gs://cvod-203614-mlengine/ACD_methods/data/predict/HughesMillCA-mixer.json'

In [0]:
#@title Don't run

# Start the upload.

!earthengine upload image --asset_id={outputAssetID} {outputImageFile} {jsonFile}

## Check the status of the asset ingestion

You can also use the Earth Engine API to check the status of your asset upload.  It might take a while.  The upload of the image is an asset ingestion task.  

In [0]:
#@title Don't run
ee.batch.Task.list()

## View the ingested asset

Display the vector of class probabilities as an RGB image with colors corresponding to the probability of bare, vegetation, water in a pixel.  Also display the winning class using the same color palette.

In [0]:
predictionsImage = ee.Image(outputAssetID)

predictionVis = {
  'bands': 'prediction',
  'min': 0,
  'max': 2,
  'palette': ['red', 'green', 'blue']
}
probabilityVis = {'bands': ['bareProb', 'vegProb', 'waterProb']}

predictionMapid = predictionsImage.getMapId(predictionVis)
probabilityMapid = predictionsImage.getMapId(probabilityVis)

map = folium.Map(location=[38., -122.5])
folium.TileLayer(
  tiles=EE_TILES.format(**predictionMapid),
  attr='Google Earth Engine',
  overlay=True,
  name='prediction',
).add_to(map)
folium.TileLayer(
  tiles=EE_TILES.format(**probabilityMapid),
  attr='Google Earth Engine',
  overlay=True,
  name='probability',
).add_to(map)
map.add_child(folium.LayerControl())
map