## ewf-wfp-02-01-02 - Aggregated Land Surface Temperature Time Series

Aggregated Land Surface Temperature Time Series

---

### <a name="service">Service definition

In [None]:
service = dict([('title', 'Aggregated Land Surface Temperature Time Series'),
                ('abstract', 'Aggregated Land Surface Temperature Time Series'),
                ('id', 'ewf-wfp-02-01-02')])

### <a name="parameter">Parameter Definition 

In [None]:
N_1 = dict([('id', 'N_1'),
            ('value', 'False'),
            ('title', 'No Aggregation'),
            ('abstract', 'No aggregation')])

In [None]:
N_3 = dict([('id', 'N_3'),
            ('value', 'True'),
            ('title', '30 Day Aggregation'),
            ('abstract', 'Get a 30 day aggregation')])

In [None]:
N_6 = dict([('id', 'N_6'),
            ('value', 'False'),
            ('title', '60 Day Aggregation'),
            ('abstract', 'Get a 30 day aggregation')])

In [None]:
N_9 = dict([('id', 'N_9'),
            ('value', 'False'),
            ('title', '90 Day Aggregation'),
            ('abstract', 'Get a 90 day aggregation')])

In [None]:
N_12 = dict([('id', 'N_12'),
             ('value', 'False'),
             ('title', '120 Day Aggregation'),
             ('abstract', 'Get a 120 day aggregation')])

In [None]:
N_15 = dict([('id', 'N_15'),
             ('value', 'False'),
             ('title', '150 Day Aggregation'),
             ('abstract', 'Get a 150 day aggregation')])

In [None]:
N_18 = dict([('id', 'N_18'),
             ('value', 'False'),
             ('title', '180 Day Aggregation'),
             ('abstract', 'Get a 180 day aggregation')])

In [None]:
N_27 = dict([('id', 'N_27'),
             ('value', 'False'),
             ('title', '270 Day Aggregation'),
             ('abstract', 'Get a 270 day aggregation')])

In [None]:
N_36 = dict([('id', 'N_36'),
             ('value', 'False'),
             ('title', '360 Day Aggregation'),
             ('abstract', 'Get a 360 day aggregation')])

In [None]:
regionOfInterest = dict([('id', 'regionOfInterest'),
                         ('value', 'POLYGON((11.5030755518998 -11.1141633706909,41.0343255518998 -11.1141633706909,41.0343255518998 -34.9763656693858,11.5030755518998 -34.9763656693858,11.5030755518998 -11.1141633706909))'),
                         ('title', 'WKT Polygon for the Region of Interest'),
                         ('abstract', 'Set the value of WKT Polygon')])

In [None]:
nameOfRegion = dict([('id', 'nameOfRegion'),
                     ('value', 'SouthernAfrica'),
                     ('title', 'Name of Region'),
                     ('abstract', 'Name of the region of interest'),
                     ('minOccurs', '1')])

In [None]:
user = dict([('id', 'user'),
             ('value', 'better-wfp-02-01-01'),
             ('title', 'user'),
             ('abstract', 'user to access catalog'),
             ('minOccurs', '1')])

In [None]:
apikey = dict([('id', 'apikey'),
                ('value', ''),
                ('title', 'apikey'),
                ('abstract', 'apikey to access catalog'),
                ('minOccurs', '1')])

In [None]:
startdate = dict([('id', 'startdate'),
                  ('value', '2020-02-25T00:00Z'),
                  ('title', 'Start date'),
                  ('abstract', 'Start date')])

In [None]:
enddate = dict([('id', 'enddate'),
                ('value', '2020-04-15T23:59Z'),
                ('title', 'End date'),
                ('abstract', 'End date')])

In [None]:
start_update = dict([('id', 'start_update'),
                     ('value', '2020-05-15T00:00Z'),
                     ('title', 'Start update'),
                     ('abstract', 'Start update')])

In [None]:
stop_update = dict([('id', 'stop_update'),
                    ('value', '2020-05-25T00:00Z'),
                    ('title', 'Stop update'),
                    ('abstract', 'Stop update')])

In [None]:
n2process = dict([('id', 'n2process'),
                  ('value', '1'),
                  ('title', 'n of aggs to process'),
                  ('abstract', 'n of aggs to process (-1 to process everthing)')])

### <a name="runtime">Runtime parameter definition

**Input identifiers**

This is the MDOIS stack of products' identifiers

In [None]:
input_identifiers = ('75C6ECAC64921C9042A194AB0F950039F870DED1',
                    '7AF1D92ABBE578C1E8AD44E282930B22DDFDCF8C',
                    'F2940F0C127A530B2EFF744CD4B8EF764FC6B14D')

**Input references**

This is the MODIS stack catalogue references

In [None]:
input_references = ('https://catalog.terradue.com/better-wfp-02-01-01/search?format=atom&uid=75C6ECAC64921C9042A194AB0F950039F870DED1',
                    'https://catalog.terradue.com/better-wfp-02-01-01/search?format=atom&uid=7AF1D92ABBE578C1E8AD44E282930B22DDFDCF8C',
                    'https://catalog.terradue.com/better-wfp-02-01-01/search?format=atom&uid=F2940F0C127A530B2EFF744CD4B8EF764FC6B14D')

**Data path**

This path defines where the data is staged-in. 

In [None]:
data_path = "/workspace/data/modis/ewf-wfp-02-01-01"

**Aux folders**

In [None]:
output_folder = ''

In [None]:
temp_folder = 'temp'

#### Import Modules

In [None]:
import os
import shutil
import sys
import string
import numpy as np
from osgeo import gdal, ogr, osr
from shapely.wkt import loads
import pandas as pd
import datetime

import cioppy
ciop = cioppy.Cioppy()

#### Auxiliary vars

In [None]:
check_results = False

#### Auxiliary methods

In [None]:
# remove contents of a given folder
# used to clean a temporary folder
def rm_cfolder(folder):
    for the_file in os.listdir(folder):
        file_path = os.path.join(folder, the_file)
        try:
            if os.path.isfile(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path): shutil.rmtree(file_path)
        except Exception as e:
            print(e)
            
            
def get_input_metadata (input_refs):
    
    # for each product get metadata
    Result_Prod = []
    
    for index,product_ref in enumerate(input_refs):
        
        # since the search is by identifier 
        Result_Prod.append(ciop.search(end_point = product_ref,params =[],output_fields='self,identifier,startdate,enclosure,title,startdate,enddate,wkt',creds='{}:{}'.format(user['value'],apikey['value']))[0] )
    

    input_metadata = pd.DataFrame.from_dict(Result_Prod)

    input_metadata['startdate'] = pd.to_datetime(input_metadata['startdate'])
    input_metadata['enddate'] = pd.to_datetime(input_metadata['enddate'])
    
    
    return input_metadata


def get_formatted_date(date_str):
    date = datetime.datetime.strftime(date_str, '%Y-%m-%dT00:00:00Z')
    return date


def write_properties_file(output_name, first_date, last_date, region_of_interest):
    
    title = 'Output %s' % output_name
    
    first_date = get_formatted_date(first_date)
    last_date = get_formatted_date(last_date)
    
    with open(output_name + '.properties', 'wb') as file:
        file.write('title=%s\n' % title)
        file.write('date=%s/%s\n' % (first_date, last_date))
        file.write('geometry=%s' % (region_of_interest))
            

def crop_image(input_image, polygon_wkt, output_path, product_type=None):
    
    dataset = None
    
    # creates directory if it doesnt exist yet
    crop_directory = os.path.dirname(output_path)
    if crop_directory is not '' and not os.path.exists(crop_directory):
        os.makedirs(crop_directory)
        
    
    if input_image.startswith('ftp://') or input_image.startswith('http'):
        try:
            dataset = gdal.Open('/vsigzip//vsicurl/%s' % input_image)
        except Exception as e:
            print(e)
    elif '.nc' in input_image:
        dataset = gdal.Open('NETCDF:' + input_image + ':' + product_type)
        
    else: # .tif
        dataset = gdal.Open(input_image)
        
    
    no_data_value = dataset.GetRasterBand(1).GetNoDataValue()
    geo_t = dataset.GetGeoTransform()
    polygon_ogr = ogr.CreateGeometryFromWkt(polygon_wkt)
    envelope = polygon_ogr.GetEnvelope()
    bounds = [envelope[0], envelope[2], envelope[1], envelope[3]]
    
    gdal.Warp(output_path, dataset, format="GTiff", outputBoundsSRS='EPSG:4326', outputBounds=bounds, srcNodata=no_data_value, dstNodata=no_data_value, xRes=geo_t[1], yRes=-geo_t[5], targetAlignedPixels=True)
    

    
def write_output_image(filepath, output_matrix, image_format, data_format, mask=None, output_projection=None, output_geotransform=None, no_data_value=None):
    
    driver = gdal.GetDriverByName(image_format)
    out_rows = np.size(output_matrix, 0)
    out_columns = np.size(output_matrix, 1)
    
    
    if mask is not None and mask is not 0:

        output = driver.Create(filepath, out_columns, out_rows, 2, data_format)
        mask_band = output.GetRasterBand(2)
        mask_band.WriteArray(mask)
        if no_data_value is not None:
            output_matrix[mask > 0] = no_data_value
    else:
        output = driver.Create(filepath, out_columns, out_rows, 1, data_format)
    
    if output_projection is not None:
        output.SetProjection(output_projection)
    if output_geotransform is not None:
        output.SetGeoTransform(output_geotransform)
    
    raster_band = output.GetRasterBand(1)
    if no_data_value is not None:
        raster_band.SetNoDataValue(no_data_value)
    raster_band.WriteArray(output_matrix)
    
    gdal.Warp(filepath, output, format="GTiff", outputBoundsSRS='EPSG:4326', xRes=output_geotransform[1], yRes=-output_geotransform[5], targetAlignedPixels=True)



    
def calc_max_matrix(mat1, mat2, no_data_value=None):
    
    if no_data_value is not None:
        if not isinstance(mat1, int):
            mat1[(mat1 == no_data_value)] = 0
        if not isinstance(mat2, int):
            mat2[(mat2 == no_data_value)] = 0
    
    return np.where(mat1 > mat2, mat1, mat2)


def matrix_sum(mat1, mat2, no_data_value=None):
    if no_data_value is not None:
        if not isinstance(mat1, int):
            mat1[(mat1 == no_data_value)] = 0
        if not isinstance(mat2, int):
            mat2[(mat2 == no_data_value)] = 0
            
            
    msum = mat1 + mat2
        
    msum[(mat1 == 0)] = 0
    msum[(mat2 == 0)] = 0
        
    return msum


def calc_average(matrix_list, n_matrix, no_data_value=None):
    if not matrix_list:
        return 0
    result = matrix_list[0]
    for i in range(1, n_matrix):
        result = matrix_sum(result, matrix_list[i], no_data_value)
    
    return np.divide(result, (n_matrix*1.00))

def get_matrix_list(image_list):
    mat_list = []
    for img in image_list:
        dataset = gdal.Open(img)
        product_array = dataset.GetRasterBand(1).ReadAsArray()
        mat_list.append(product_array)
        dataset = None
    return mat_list



def calc_aggregations(product_list, N_value, region_of_interest, product_type, is_it_to_crop=True):

    mask_no_data_value = 0
    max_values = 0
    averages = 0
    temp_list = []
    no_data_value = None
    
    
    geo_transform = None
    projection = None
    no_data_value = None
    
    print(type(product_list))
    
    for product_url in product_list:
        
        # uncompressed data
        product = product_url.split('/')[-1]
        print(product)
        
        cropped_product_path = os.path.join(temp_folder, 'crop_' + product)

        try:
            
            if is_it_to_crop:
                crop_image(product_url, region_of_interest, cropped_product_path, product_type)
            else:
                cropped_product_path = product_url

            
            # Read GeoTIFF as an array
            dataset = gdal.Open(cropped_product_path)
            product_array = dataset.GetRasterBand(1).ReadAsArray()
            no_data_value = dataset.GetRasterBand(1).GetNoDataValue()
            print(no_data_value)
            geo_transform = dataset.GetGeoTransform()
            projection = dataset.GetProjection()
            ## Create mask of no_data_values
            if no_data_value is not None:
                if isinstance(mask_no_data_value, int):
                    mask_no_data_value = np.where(product_array == no_data_value, 1, 0)
                else:
                    temp_mask = np.where(product_array == no_data_value, 1, 0)
                    mask_no_data_value = matrix_sum(mask_no_data_value, temp_mask)
            max_values = calc_max_matrix(max_values, product_array, no_data_value)
            temp_list.append(product_array)
            dataset = None
            
        except Exception as e:
            print('Error processing the product ' + product + ': ' + str(e))
            
        if os.path.exists(cropped_product_path) and is_it_to_crop:
            os.remove(cropped_product_path)
    
    averages = calc_average(temp_list, N_value, no_data_value)
    
    return max_values, averages, mask_no_data_value, geo_transform, projection, no_data_value




def write_outputs(product_name, roi_name, first_date, last_date, averages, max_values, mask_no_data_value, image_format, product_count, projection, geo_transform, no_data_value):
    filenames = []
    filenames.append(product_name + '_' + roi_name + '_N' + str(product_count) + '_averages_' + first_date + '_' + last_date + '.tif')
    filenames.append(product_name + '_' + roi_name + '_N' + str(product_count) + '_maxvalues_' + first_date + '_' + last_date + '.tif')
    
    #GDT_UInt16
    
    mask_no_data_value = None # Temp
    write_output_image(filenames[0], averages, image_format,  gdal.GDT_UInt16, mask_no_data_value, projection, geo_transform, no_data_value)
    write_output_image(filenames[1], max_values, image_format, gdal.GDT_UInt16, mask_no_data_value, projection, geo_transform, no_data_value)
    
    return filenames

#### Auxiliary folders

In [None]:
if len(output_folder) > 0:
    if not os.path.isdir(output_folder):
        os.mkdir(output_folder)

if not os.path.isdir(temp_folder):
    os.mkdir(temp_folder)

#### Load metadata from catalog

In [None]:
message = 'Loading metadata from catalog' 
ciop.log('INFO', message)

prods = get_input_metadata (input_references)
prods

#### Workflow

In [None]:
prods = prods.sort_values(by='startdate', ascending=False)


# N time steps
nlist = [N_1['value'], N_3['value'], N_6['value'], N_9['value'], N_12['value'], N_15['value'], N_18['value'], N_27['value'], N_36['value']]
nlist = [n == 'True' for n in nlist]
nvalues = [1, 3, 6, 9, 12, 15, 18, 27, 36]

num2process = int(n2process['value'])

# py dict to group references by N.
# For each N there is a list of subDataFrames with
# the references for each aggregation
input_references_by_N = {}

for bl,nv in zip(nlist, nvalues):
    
    if bl: # if true for a given N
        
        N = nv
        input_references_bins = []
    
        for i in range(len(prods)):
    
            if i > len(prods) - N: # break if its impossible to subset the dataframe
                break
        
            d = prods.iloc[i:i+N]
        
            if num2process == -1: # append all possible combos
                input_references_bins.append(d)
            else:
                if len(input_references_bins) < num2process:
                    input_references_bins.append(d)
    
        input_references_by_N[nv] = input_references_bins

In [None]:
message = 'Number of aggs to process per N:' 

for nk in input_references_by_N.keys():
    message = message + '\nN' + str(nk) + ': ' + str(len(input_references_by_N[nk]))

ciop.log('INFO', message)

Check if crops

In [None]:
polygon_wkt = regionOfInterest['value']

global_wkt = 'POLYGON((-179.999 89.999, 179.999 89.999, 179.999 -89.999, -179.999 -89.999, -179.999 89.999))'


polygon_ogr = ogr.CreateGeometryFromWkt(polygon_wkt)

global_ogr = ogr.CreateGeometryFromWkt(global_wkt)

is_it_to_crop = not(polygon_ogr.Contains(global_ogr))

is_it_to_crop

Compute Aggregations

In [None]:
region_of_interest = regionOfInterest['value']
name_of_region = nameOfRegion['value']

for nk in input_references_by_N.keys():

    input_references_bins = input_references_by_N[nk]
    
    for input_metada in input_references_bins:

        # get start and last date from metadata
        first_date = input_metada['startdate'].min().strftime('%Y-%m-%d')
        last_date = input_metada['startdate'].max().strftime('%Y-%m-%d')

        print(first_date)
        print(last_date)

        # list of files

        message = 'Computing aggregations' 
        ciop.log('INFO', message)


        # works only if N and number of given images match
        if nk == len(input_metada):
            
            N = nk
            
            file_list = [os.path.join(data_path, os.path.basename(enclosure).split('?')[0]) for enclosure in input_metada['enclosure']]
        
            #print(file_list)

            max_values, averages, no_value, geo_transform, projection, no_data = calc_aggregations(file_list, N, region_of_interest, None, is_it_to_crop)
            
            #print(averages)
                
            message = 'Exporting result as geotiff' 
            ciop.log('INFO', message)

            product_path_name = os.path.join(output_folder, 'LST' )

            filenames = write_outputs(product_path_name, name_of_region, first_date, last_date, averages, max_values, no_value, 'GTiff', N, projection, geo_transform, no_data)
                
            message = 'Writing properties file' 
            ciop.log('INFO', message)

            for output_name in filenames:
    
                write_properties_file(output_name, datetime.datetime.strptime(first_date, "%Y-%m-%d").date(), datetime.datetime.strptime(last_date, "%Y-%m-%d").date(), region_of_interest)
    
            
        else:
            
            print("N value and number of images don't match")



#### Check results

In [None]:
if check_results:
    import matplotlib
    import matplotlib.pyplot as plt

    fig = plt.figure()
    plt.imshow(averages)
    plt.show()

#### Remove temporay files and folders

In [None]:
message = 'Removing temporary files' 
ciop.log('INFO', message)

rm_cfolder(temp_folder)

os.rmdir(temp_folder)