## WFP-01-03-02 CHIRPS Rainfall Estimates (RFE) - Aggregations

 This application generates Rainfall Estimates (RFE) aggregations, from CHIRPS RFE 5km resolution, compared to a reference period.

### <a name="objective">Objective 

The objective of this code is to determine:
    - Sum of daily data over the past N days, derived every 10 days (N = 10, 30, 60, 90, 120, 150, 180, 270, 365 days)
    - Counts of daily data above 1mm over the past N days, derived every 10 days (N = 30, 60, 90 days).
    - Longest sequence of daily values < 2mm ("dry spell") within the last N days, derived every 10 days (N = 30, 60, 90 days).


### <a name="service">Service definition

In [1]:
service = dict([('title', 'CHIRPS Rainfall Estimates (RFE) - Aggregations'),
                ('abstract', 'TBD'),
                ('id', 'wfp-01-03-02')])

### <a name="runtime">Runtime parameter definition

**Input references**

This is the CHIRPS stack catalogue references

In [2]:
input_references = 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.03.10'

In [3]:
N_10 = dict([('id', 'N_10'),
                          ('value', 'True'),
                          ('title', '10 Day Aggregation'),
                          ('abstract', 'Get a 10 day aggregation')])

In [4]:
N_30 = dict([('id', 'N_30'),
                          ('value', 'True'),
                          ('title', '30 Day Aggregation'),
                          ('abstract', 'Get a 30 day aggregation')])

In [5]:
N_60 = dict([('id', 'N_60'),
                          ('value', 'False'),
                          ('title', '60 Day Aggregation'),
                          ('abstract', 'Get a 60 day aggregation')])

In [6]:
N_90 = dict([('id', 'N_90'),
                          ('value', 'False'),
                          ('title', '90 Day Aggregation'),
                          ('abstract', 'Get a 90 day aggregation')])

In [7]:
N_120 = dict([('id', 'N_120'),
                          ('value', 'False'),
                          ('title', '120 Day Aggregation'),
                          ('abstract', 'Get a 120 day aggregation')])

In [8]:
N_150 = dict([('id', 'N_150'),
                          ('value', 'False'),
                          ('title', '150 Day Aggregation'),
                          ('abstract', 'Get a 150 day aggregation')])

In [9]:
N_180 = dict([('id', 'N_180'),
                          ('value', 'False'),
                          ('title', '180 Day Aggregation'),
                          ('abstract', 'Get a 180 day aggregation')])

In [10]:
N_270 = dict([('id', 'N_270'),
                          ('value', 'False'),
                          ('title', '270 Day Aggregation'),
                          ('abstract', 'Get a 270 day aggregation')])

In [11]:
N_365 = dict([('id', 'N_365'),
                          ('value', 'False'),
                          ('title', '365 Day Aggregation'),
                          ('abstract', 'Get a 365 day aggregation')])

In [12]:
regionOfInterest = dict([('id', 'regionOfInterest'),
                          ('value', 'POLYGON((-30 -10, 20 -10, 20 40, -30 40, -30 -10))'),
                          ('title', 'WKT Polygon for the Region of Interest'),
                          ('abstract', 'Set the value of WKT Polygon')])

**Data path**

This path defines where the data is staged-in. 

In [13]:
data_path = "/workspace/data/chirps-2.0/"
unzipped_chirps_path = "/workspace/data/chirps-2.0/unzipped_chirps/"

### <a name="workflow">Workflow

#### Import the packages required for processing the data

In [14]:
from osgeo import gdal, ogr, osr
from geopandas import GeoDataFrame
import gzip
import cioppy
import shutil
import sys
import numpy as np
import pandas as pd
import math
import re
import os
sys.path.append(os.getcwd())
sys.path.append('/application/notebook/libexec/')
from aux_functions import matrix_sum, mask_matrix, crop_image, write_output_image
print(sys.path)

['', '/opt/anaconda/lib/python27.zip', '/opt/anaconda/lib/python2.7', '/opt/anaconda/lib/python2.7/plat-linux2', '/opt/anaconda/lib/python2.7/lib-tk', '/opt/anaconda/lib/python2.7/lib-old', '/opt/anaconda/lib/python2.7/lib-dynload', '/opt/anaconda/lib/python2.7/site-packages/Glymur-0.8.6-py2.7.egg', '/opt/anaconda/lib/python2.7/site-packages/click_plugins-1.0.3-py2.7.egg', '/opt/anaconda/lib/python2.7/site-packages/setuptools-23.0.0-py2.7.egg', '/opt/anaconda/lib/python2.7/site-packages', '/opt/anaconda/lib/python2.7/site-packages/PIL', '/opt/anaconda/lib/python2.7/site-packages/IPython/extensions', '/home/rirr/.ipython', '/workspace/wfp-01-03-02/src/main/app-resources/notebook/libexec', '/application/notebook/libexec/']


In [15]:
def get_info(row, search_params):
    search = ciop.search(end_point=row['catalogue_url'], 
                                  params=search_params,
                                  output_fields='identifier,startdate,enclosure',
                                  model='GeoTime')[0]
    
    series = pd.Series(search)
    
    series['startdate'] = pd.to_datetime(series['startdate'])
    
    return series

In [16]:
if isinstance(input_references, str):
    input_references = [input_references]


'''test_references = ['https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.03', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.02',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.01', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.05',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.04', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.06',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.07', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.08',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.09', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.10',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.11', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.12',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.13', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.14',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.15', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.16',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.17', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.18',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.19', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.20',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.21', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.22',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.23', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.24',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.25', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.26',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.27', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.28',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.29', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.30']
                  # 'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.05.31']
''''''
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.03', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.02',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.01', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.05',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.04', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.06',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.07', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.08',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.09', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.10',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.11', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.12',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.13', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.14',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.15', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.16',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.17', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.18',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.19', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.20',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.21', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.22',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.23', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.24',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.25', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.26',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.27', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.28',
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.29', 
                   'https://catalog.terradue.com/chirps/search?format=atom&uid=chirps-v2.0.2017.04.30']
'''

ciop = cioppy.Cioppy()

gpd_data = GeoDataFrame(input_references,
                       columns=['catalogue_url'])

region_of_interest = regionOfInterest['value']
gpd_data = gpd_data.sort_values(by='catalogue_url')
start_date = re.findall('\d{4}\.\d{2}\.\d{2}', gpd_data.iloc[0]['catalogue_url'])[0].replace('.', '-')
end_date = re.findall('\d{4}\.\d{2}\.\d{2}', gpd_data.iloc[-1]['catalogue_url'])[0].replace('.', '-')
print(start_date)
print(end_date)
print(len(input_references))
search_params =  dict([('start', start_date),
                      ('stop', end_date),
                      ('count', len(input_references))])
gpd_final = gpd_data.apply(lambda row: get_info(row, search_params), axis=1)
gpd_final.head()

2017-03-10
2017-03-10
1


Unnamed: 0,enclosure,identifier,startdate
0,ftp://anonymous@ftp.chg.ucsb.edu/pub/org/chg/p...,chirps-v2.0.2017.03.10,2017-03-10


In [None]:
def calc_aggregations(product_list, N_value, region_of_interest):
    mask_no_data_value = 0
    sum_result = 0
    count_above_one = 0
    max_sequence = 0
    temp_mat = 0
    regions_below_two = 0
    projection = None
    geo_transform = None
    print(type(product_list))
    for chirp_product_url in product_list:
        # uncompressed data
        chirp_product = (chirp_product_url.split('/')[-1]).split('.gz')[0]
        print(chirp_product)
        cropped_product_path = 'crop_' + chirp_product
        try:
            crop_image(chirp_product_url, region_of_interest, cropped_product_path)
            # Read GeoTIFF as an array
            dataset = gdal.Open(cropped_product_path)
            product_array = dataset.GetRasterBand(1).ReadAsArray()
            no_data_value = dataset.GetRasterBand(1).ComputeRasterMinMax()[0]
            geo_transform = dataset.GetGeoTransform()
            projection = dataset.GetProjection()
            ## Create mask of no_data_values
            if isinstance(mask_no_data_value, int):
                mask_no_data_value = np.where(product_array == no_data_value, 1, 0)
            else:
                temp_mask = np.where(product_array == no_data_value, 1, 0)
                mask_no_data_value = matrix_sum(mask_no_data_value, temp_mask)

            ## Create iteratively the sum array
            sum_result = matrix_sum(sum_result, product_array, no_data_value)

            ## Create iteratively the array with the counts of daily data above 1mm
            if N_value == 30 or N_value == 60 or N_value == 90:
                regions_above_one = mask_matrix(product_array, 1, True, no_data_value)
                count_above_one = matrix_sum(count_above_one, regions_above_one)

                ## Create iteratively the array with the longest sequence of daily values <2mm
                regions_below_two = mask_matrix(product_array, 2, False, no_data_value)
                temp_mat = matrix_sum(temp_mat, regions_below_two)
                if isinstance(max_sequence, int):
                    max_sequence = temp_mat
                max_sequence[regions_below_two == 0] = np.maximum(max_sequence[regions_below_two == 0], temp_mat[regions_below_two == 0])
                temp_mat[regions_below_two == 0] = 0
            dataset = None
        except Exception as e:
            print('Error processing the product ' + chirp_product + ': ' + str(e))
        if os.path.exists(cropped_product_path):
            os.remove(cropped_product_path)
        
    return sum_result, count_above_one, max_sequence, mask_no_data_value, projection, geo_transform

In [None]:
def write_outputs(first_date, last_date, sum_result, count_above_one, max_sequence, mask_no_data_value, image_format, product_count, projection, geo_transform):
    write_output_image('CHIRPSv2.N' + str(product_count) + '.days_total.' + first_date + '.' + last_date + '.tif', sum_result, image_format, projection, geo_transform, mask_no_data_value)
    if product_count == 30 or product_count == 60 or product_count == 90:
        write_output_image('CHIRPSv2.N' + str(product_count) + '.count_above_one.' + first_date + '.' + last_date + '.tif', count_above_one, image_format, projection, geo_transform, mask_no_data_value)
        write_output_image('CHIRPSv2.N' + str(product_count) + '.dry_spell.' + first_date + '.' + last_date + '.tif', max_sequence, image_format, projection, geo_transform, mask_no_data_value)

In [None]:
nlist = [N_10['value'], N_30['value'], N_60['value'], N_90['value'], N_120['value'], N_150['value'], N_180['value'], N_270['value'], N_365['value']]
nvalue = [10, 30, 60, 90, 120, 150, 180, 270, 365]
nlist = [n=='True' for n in nlist]
product_years = gpd_final['startdate'].dt.year.unique()
for _year in product_years:
    products_data = gpd_final[(gpd_final['startdate'].dt.year == _year)]
    L = len(products_data.index.values)
    months_of_products = products_data['startdate'].dt.month.unique()
    for n in [index for index, value in enumerate(nlist) if value==True]:
        N = nvalue[n]
        n_months = N/30
        if n_months > len(months_of_products):
            print('Not enough products available for a aggregation of ' + str(N))
            break
        if N == 10:
            #n_intervals=int(math.ceil((L*1.00)/N))
            n_intervals=len(months_of_products)*3
            for i in range(n_intervals):
                print('Interval ' + str(i+1) + ' of N=10')
                month_index = i/3
                if i%3 == 0: #start of the month.
                    start_date = products_data[((products_data['startdate'].dt.month == months_of_products[month_index]))]['startdate'].tolist()[0]
                    end_date = products_data[((products_data['startdate'].dt.month == months_of_products[month_index]) & (products_data['startdate'].dt.day == 10))]['startdate'].tolist()[0]
                elif i%3 == 2: #last interval of month. Each month has 3 N=10 aggregations
                    start_date = products_data[((products_data['startdate'].dt.month == months_of_products[month_index]) & (products_data['startdate'].dt.day == 21))]['startdate'].tolist()[0]
                    end_date = products_data[((products_data['startdate'].dt.month == months_of_products[month_index]))]['startdate'].tolist()[-1]
                else:
                    start_date = products_data[((products_data['startdate'].dt.month == months_of_products[month_index]) & (products_data['startdate'].dt.day == 11))]['startdate'].tolist()[0]
                    end_date = products_data[((products_data['startdate'].dt.month == months_of_products[month_index]) & (products_data['startdate'].dt.day == 20))]['startdate'].tolist()[0]
                interval_products = products_data[(products_data['startdate'] >= start_date) & (products_data['startdate'] <= end_date)]['enclosure'].tolist()
                first_date = start_date.strftime('%Y.%m.%d')
                last_date = end_date.strftime('%Y.%m.%d')
                print(first_date)
                print(last_date)
                daily_sum, count_above_one, longest_sequence, no_value, projection, geo_transform = calc_aggregations(interval_products, N, region_of_interest)
                write_outputs(first_date, last_date, daily_sum, count_above_one, longest_sequence, no_value, 'GTiff', N, projection, geo_transform)
        else:
            n_intervals = int(round((L*1.00)/N))
            for i in range(0, n_intervals, n_months):
                print('Interval ' + str(i+1) + ' of N ' + str(N))
                print(months_of_products[i])
                start_date = products_data[((products_data['startdate'].dt.month == months_of_products[i]))]['startdate'].tolist()[0]
                end_date = products_data[((products_data['startdate'].dt.month == months_of_products[i]+ n_months-1))]['startdate'].tolist()[-1]
                interval_products = products_data[(products_data['startdate'] >= start_date) & (products_data['startdate'] <= end_date)]['enclosure'].tolist()
                first_date = start_date.strftime('%Y.%m.%d')
                last_date = end_date.strftime('%Y.%m.%d')
                print(first_date)
                print(last_date)
                daily_sum, count_above_one, longest_sequence, no_value, projection, geo_transform = calc_aggregations(interval_products, N, region_of_interest)
                write_outputs(first_date, last_date, daily_sum, count_above_one, longest_sequence, no_value, 'GTiff', N, projection, geo_transform)
            #interval_start = i*N
            # interval_end = ((i+1)*N)
            # if interval_end > L:
            #     interval_end = L

            #interval_products = products_data.iloc[interval_start:interval_end]['enclosure'].tolist()
            #first_date = month_data.iloc[interval_start:interval_end]['startdate'].tolist()[0].strftime('%Y.%m.%d')
            #last_date = month_data.iloc[interval_start:interval_end]['startdate'].tolist()[-1].strftime('%Y.%m.%d')
           
        #daily_sum, count_above_one, longest_sequence, no_value, projection, geo_transform = calc_aggregations(interval_products, N, region_of_interest)
        #write_outputs(first_date, last_date, daily_sum, count_above_one, longest_sequence, no_value, 'GTiff', N, projection, geo_transform)     

In [None]:
'''
from matplotlib import pyplot
%matplotlib inline


fig0 = pyplot.figure()
ax0 = fig0.add_subplot(111)
cax0 = ax0.matshow(daily_sum)
fig0.colorbar(cax0)

fig1 = pyplot.figure()
ax1 = fig1.add_subplot(111)
cax1 = ax1.matshow(count_above_one)
fig1.colorbar(cax1)


fig = pyplot.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(no_value)
fig.colorbar(cax)


fig2 = pyplot.figure()
ax2 = fig2.add_subplot(111)
cax2 = ax2.matshow(longest_sequence)
fig2.colorbar(cax2)

pyplot.show()
'''