In [None]:
import ee
service_account = 'renosterveld-ee@ee-vegetation-gee4geo.iam.gserviceaccount.com'
credentials = ee.ServiceAccountCredentials(service_account, 'ee-vegetation-gee4geo-6309a79ef209.json')
ee.Initialize(credentials)

# EArth Engine docs:
# https://developers.google.com/earth-engine/guides
# https://gee-python-api.readthedocs.io/en/latest/index.html

In [None]:
import sys
import os
import json
from google.cloud import storage

from utils.globals import *
from utils.s2_ee import *
from utils.misc_ee import *
from utils.tf_data_utils import *
from eoflow.models import TempCNNModel

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

## Preprocessing of satellite data and export to TFrecord
These steps require no local compute. They just call the earth engine API

In [None]:
#load area to predict
predict = ee.FeatureCollection(PREDICT_MASK)

#mask to our study area
mask_predict = predict \
  .map(lambda feature: feature.set('flag', ee.Number(1))) \
  .reduceToImage(['flag'],ee.Reducer.first())

#map to dates and create imagecol from results
#produces stack of images from yesterday to 6 months ago
ilist = pdates.map(predS2(predict,mask_predict))
imageCol_gaps = ee.ImageCollection(ilist)
imageCol_gaps = imageCol_gaps.select(allbands)
names = imageCol_gaps.first().bandNames()

#find first non-null value in timeseries
first_im = imageCol_gaps\
  .reduce(ee.Reducer.firstNonNull())\
  .rename(names)\
  .set({'system:index': 'first'})
first = ee.List([first_im])

#fill nulls
#then drop first image
imageCol = ee.ImageCollection(ee.List(imageCol_gaps.iterate(gap_fill, first)))\
  .filter(ee.Filter.neq('system:index', 'first'))

#timeseries length
tsLengthPred = imageCol.size().getInfo()
#18

#flatten to array bands
imageCol = imageCol.map(remclip(mask_predict))
imageCol_all = imageCol.toArrayPerBand()


#It may be necessary to save intermediate files to prevent exceeding ee capacity
#Just ignore this if this entire code block succesfully exports

#assetPath = 'users/glennwithtwons/s2_reno_predict'
#str_name = 'task_phase3_'
#task = ee.batch.Export.image.toAsset(image=imageCol_all,
#                                 region=poi,
#                                 description=str_name,
#                                 assetId=assetPath,
#                                 scale=10,
#                                 maxPixels=2000000000000)
#task.start()
#task.status()
#imageCol_all = ee.Image(assetPath)

#define bands to export and dim
patchdim = 64
fmap = dict(zip(list(allbands), np.repeat(tsLengthPred,len(allbands)).tolist()))

imageFilePrefix = PREDICT_IMG
# Specify patch and file dimensions.
imageExportFormatOptions = {
  'patchDimensions': [patchdim, patchdim],
  'maxFileSize': 104857600000,
  'compressed': True,
  'tensorDepths': fmap
}

# Export imagery in this region.
exportRegion =  poi

# Setup the task
imageTask = ee.batch.Export.image.toCloudStorage(
  image=imageCol_all,
  description='Image Export',
  fileNamePrefix=imageFilePrefix,
  bucket=outputBucket,
  maxPixels=200000000,
  scale=10,
  fileFormat='TFRecord',
  region=exportRegion,
  formatOptions=imageExportFormatOptions,
)

# Start the task
imageTask.start()

#Poll task status
#imageTask.status()

#this whole process can take 6-24 hours

# Predict
Here we read and preprocess the data exported by earth engine from a cloud storage bucket  
We then predict using a saved model  
  
These steps require a GPU to accelerate predicition. It takes about 12-24hrs to finish predictions

#### prepare data for model

In [None]:
#precalculated scaling factors used to standardize data
with open('data/max.json') as f:
  Max = json.load(f)

with open('data/min.json') as f:
  Min = json.load(f)

# Get a list of all the files in the output bucket.
filesList = !gsutil ls 'gs://'{outputBucket}
# Get only the files generated by the image export.
exportFilesList = [s for s in filesList if imageFilePrefix in s]

# Get the list of image files and the JSON mixer file.
imageFilesList = []
jsonFile = None
for f in exportFilesList:
  if f.endswith('.tfrecord.gz'):
    imageFilesList.append(f)
  elif f.endswith('.json'):
    jsonFile = f

#print(imageFilesList)
#print(jsonFile)

# Load the contents of the mixer file to a JSON object.
jsonText = !gsutil cat {jsonFile}
# Get a single string w/ newlines from the IPython.utils.text.SList
mixer = json.loads(jsonText.nlstr)
#print(mixer)

# Get relevant info from the JSON mixer file.
PATCH_WIDTH = mixer['patchDimensions'][0]
PATCH_HEIGHT = mixer['patchDimensions'][1]
PATCHES = mixer['totalPatches']
PATCH_DIMENSIONS_FLAT = [tsLengthPred,PATCH_WIDTH * PATCH_HEIGHT]

# Note that the tensors are in the shape of a patch, one patch for each band.
imageColumns = [
  tf.io.FixedLenFeature(shape=PATCH_DIMENSIONS_FLAT, dtype=tf.float32) 
    for k in allbands
]

# Parsing dictionary.
imageFeaturesDict = dict(zip(allbands, imageColumns))

# Note that you can make one dataset from many files by specifying a list.
imageDataset = tf.data.TFRecordDataset(imageFilesList, compression_type='GZIP')

# Parse the data into tensors, one long tensor per patch 
# then transpose
# Break our long tensors into many little ones.
# Turn the dictionary in each record into a tuple without a label.
# if standaerdize needed it goes second last

inDataset = imageDataset\
.map(lambda x: parse_image(x,imageFeaturesDict))\
.map(tp_image)\
.flat_map(
  lambda features: tf.data.Dataset.from_tensor_slices(features)
)\
.map(lambda x: standardize(x, Min,Max))\
.map(
  lambda dataDict: (tf.transpose(list(dataDict.values())), )
)

# Turn each patch into a batch.
inDataset = inDataset.batch(PATCH_WIDTH * PATCH_HEIGHT)

#### run model

In [None]:
# Run prediction in batches, with as many steps as there are patches.
model = tf.keras.models.load_model('save_test', compile=False)
predictions = model.predict(inDataset, steps=PATCHES, verbose=1)

# Note that the predictions come as a numpy array.  Check the first one.
#print(predictions[0])
pred_im = np.argmax(predictions,axis=1)
#np.histogram(pred_im, bins=[0, 1, 2])
outputImageFile = 'data/' + PREDICTIONS_IMG 
print('Writing to file ' + outputImageFile)

# Instantiate the writer.
writer = tf.io.TFRecordWriter(outputImageFile)

# Every patch-worth of predictions we'll dump an example into the output
# file with a single feature that holds our predictions. Since our predictions
# are already in the order of the exported data, the patches we create here
# will also be in the right order.
patch = [[], [], []]
curPatch = 1
for prediction in predictions:
  patch[0].append(tf.argmax(prediction))
  patch[1].append(prediction[0])
  patch[2].append(prediction[1])
  # Once we've seen a patches-worth of class_ids...
  if (len(patch[0]) == PATCH_WIDTH * PATCH_HEIGHT):
    print('Done with patch ' + str(curPatch) + ' of ' + str(PATCHES) + '...')
    # Create an example
    example = tf.train.Example(
      features=tf.train.Features(
        feature={
          'prediction': tf.train.Feature(
              int64_list=tf.train.Int64List(
                  value=patch[0])),
          'RenoProb': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=patch[1])),
          'TransProb': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=patch[2])),
        }
      )
    )
    # Write the example to the file and clear our patch array so it's ready for
    # another batch of class ids
    writer.write(example.SerializeToString())
    patch = [[], [], []]
    curPatch += 1

writer.close()

## Upload results to Earth Engine
We need to send the results back to Earth Engine to turn them into a geospatial raster
This does not require much compute - we are just uploading data

In [None]:
#upload reasults from local storage to cloud storage
#I am not sure this function works. I was getting permission errors
#This should be v quick

upload_blob(outputBucket_predict, outputImageFile, PREDICTIONS_IMG)

In [None]:
# Get a list of all the files in the output bucket
filesList = !gsutil ls 'gs://'{outputBucket_predict}
# Get only the files generated by the image export.
exportFilesList = [s for s in filesList if imageFilePrefix in s]

jsonFile = None
for f in exportFilesList:
  if f.endswith('.json'):
    jsonFile = f
    
print('Writing to ' + PREDICT_ASS)

# Start the upload
# calls earth engine from the command line
# this takes a while - 6-12 hours
# It is possible to poll the api to check the status of this task
# ee.batch.Task.list()
# ee.batch.Task(task_id).status

# !earthengine upload image --asset_id={PREDICT_ASS} {'gs://' + outputBucket_predict+ '/' +PREDICTIONS_IMG} {jsonFile}