In [1]:
import numpy as np
import xarray as xr
import os
from multiprocessing import Pool, cpu_count
from osgeo import gdal, ogr
import geopandas as gpd
from datacube.helpers import write_geotiff

#import custom functions
import sys
sys.path.append("../Scripts")
from dea_spatialtools import xr_rasterize

In [2]:
############
#User Inputs
############

#how many cpus should the job be distrubuted over?
cpus = 4

# where are the dcStats mosaics tifs?
tiffs = "/g/data/r78/cb3058/dea-notebooks/vegetation_anomalies/results/NSW_NDVI_Climatologies_std/mosaics/"

#Shapefile we're using for clipping the extent? e.g.NSW state polygon
clip_shp = "/g/data/r78/cb3058/dea-notebooks/vegetation_anomalies/data/NSW_and_ACT.shp"

# where should I put the results?
results = "/g/data/r78/cb3058/dea-notebooks/vegetation_anomalies/results/NSW_NDVI_Climatologies_std/mosaics/"



In [3]:
# script proper-----------------------------

def clip_extent(tif):
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    print("starting processing of " + tif)
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    
    #limiting the extent to the shapefile
    print('clipping extent to provided polygon')
    ds = xr.open_rasterio(tiffs + tif).squeeze()
    
    #load shapefile
    gdf = gpd.read_file(clip_shp)
    gdf = gdf.to_crs({'init': 'epsg:3577'})
    
    #rasterize shapeile
    mask = xr_rasterize(gdf=gdf,
                         da=ds)
    
    #clip to shapeile extent
    clipped_ds = ds.where(mask)
    
    #export results
    clipped_ds = clipped_ds.to_dataset(name = 'data')
    clipped_ds['data'].attrs = ds.attrs 
    clipped_ds.attrs = ds.attrs
    
    write_geotiff(results+tif[:-11]+'.tif', clipped_ds) 
    

In [4]:
list_of_tifs = os.listdir(tiffs)    
list_of_tifs.sort()
print(list_of_tifs)

['.ipynb_checkpoints', 'ndvi_clim_std_AMJ_nsw_mosaic.tif', 'ndvi_clim_std_ASO_nsw_mosaic.tif', 'ndvi_clim_std_DJF_nsw.tif', 'ndvi_clim_std_FMA_nsw.tif', 'ndvi_clim_std_JAS_nsw.tif', 'ndvi_clim_std_JFM_nsw.tif', 'ndvi_clim_std_JJA_nsw.tif', 'ndvi_clim_std_MAM_nsw.tif', 'ndvi_clim_std_MJJ_nsw.tif', 'ndvi_clim_std_NDJ_nsw.tif', 'ndvi_clim_std_OND_nsw.tif', 'ndvi_clim_std_SON_nsw.tif']


In [6]:
list_of_tifs[1:3]

['ndvi_clim_std_AMJ_nsw_mosaic.tif', 'ndvi_clim_std_ASO_nsw_mosaic.tif']

In [7]:
for i in list_of_tifs[1:3]:
    clip_extent(i)

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
starting processing of ndvi_clim_std_AMJ_nsw_mosaic.tif
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
clipping extent to provided polygon
Rasterizing to match xarray.DataArray dimensions (40000, 48000) and projection system/CRS (e.g. +init=epsg:3577)
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
starting processing of ndvi_clim_std_ASO_nsw_mosaic.tif
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
clipping extent to provided polygon
Rasterizing to match xarray.DataArray dimensions (40000, 48000) and projection system/CRS (e.g. +init=epsg:3577)


### Info for parallel processing with Dask
1. If reading netcdf files make sure each worker has one thread
2. memory_limit is per worker not per cluster of workers
3. When launching multiple workers (needed when reading netcdfs) on the same node you have to supply memory limit, otherwise every worker will assume they have all the memory

Notes from Imam
- consider the datatype, float32 is best. Perhaps prevent the load landsat function changing nodata values
- Need to 'scale' the reflectance values to 0-1, rather than 0-10,000
- .to_netcdf() is a bottle neck, need to limit the number of workers hitting it (bigger chunks is better)
- look into .to_zarr(), which is an experimental netcdf writer that deals is better for multprocessing


In [None]:
client = locals().get('client', None)
if client is not None:
    client.close()
    del client

# client = Client(n_workers=6, threads_per_worker=1, memory_limit='4GB')
# client

### User Inputs

In [None]:
#If not using a polygon then enter your AOI coords
#below:
lat, lon = -34.294, 146.037
latLon_adjust = 0.025

start = '2013-12-01'
end = '2019-05-31'

shp_fpath = "/g/data1a/r78/cb3058/dea-notebooks/dcStats/data/spatial/MDB_plus_NSW.shp"
chunk_size = 100

### Load data

In [None]:
# query = query_from_shp.query_from_shp(shp_fpath, start, end)
query = {'lon': (lon - latLon_adjust, lon + latLon_adjust),
         'lat': (lat - latLon_adjust, lat + latLon_adjust),
        'time': (start, end)}

# query = query_from_shp.query_from_shp(shp_fpath, start, end)
dc = datacube.Datacube(app='load_clearlandsat')

ds = anomalies.load_landsat(dc=dc, query=query, sensors=['ls5','ls7','ls8'], 
                           bands_of_interest=['nir', 'red'], lazy_load=True,
                           dask_chunks = {'x': chunk_size, 'y': chunk_size})
ds

In [None]:
quarter= {'JFM': [1,2,3],
           'FMA': [2,3,4],
           'MAM': [3,4,5],
           'AMJ': [4,5,6],
           'MJJ': [5,6,7],
           'JJA': [6,7,8],
           'JAS': [7,8,9],
           'ASO': [8,9,10],
           'SON': [9,10,11],
           'OND': [10,11,12],
           'NDJ': [11,12,1],
           'DJF': [12,1,2]
              }

def compute(data, quarter):
    
    def attrs_reassign(da, dtype=np.float32):
        """little function to reassigna atributes
        to the dataArrays inside a dataset"""
        da_attr = data.attrs
        da = da.assign_attrs(**da_attr)
        return da

    ndvi = (data.nir - data.red) / (data.nir + data.red)

    ndvi_var = []
    for q in quarter:
        ix=ndvi['time.month'].isin(quarter[q])
        ndvi_clim_mean=ndvi[ix].mean(dim='time')   
        ndvi_clim_mean=ndvi_clim_mean.rename('ndvi_clim_mean_'+q)
        ndvi_var.append(ndvi_clim_mean)
    
    q_clim_mean = xr.merge(ndvi_var)   
    q_clim_mean.attrs = data.attrs 
    #assign back attributes
    q_clim_mean = q_clim_mean.apply(attrs_reassign, keep_attrs=True)
      

    return q_clim_mean   

In [None]:
x = compute(ds, quarter)

In [None]:
x.ndvi_clim_mean_AMJ

In [None]:
x = xr.merge(x)


In [None]:
x

##### Calculate seasonal anomalies

In [None]:
def compute_climatology(data, stat='mean'):
    #Scale reflectance values to 0-1
    nir = data.nir / 10000
    red = data.red / 10000
    #calculate msavi
    msavi = (2*nir+1-((2*nir+1)**2 - 8*(nir-red))**0.5)/2
    msavi = msavi.astype('float32') #convert to reduce memory
    
    if stat == 'mean':
        #calculate climatologies and compute
        msavi = msavi.resample(time='QS-DEC').mean('time')
        climatology_mean = msavi.groupby('time.season').mean('time').rename('masvi_mean_climatology')
        climatology_mean = climatology_mean.to_dataset()
        climatology_mean.to_zarr('results/masvi_climatology_mean.nc')

    if stat == 'std':
        #calculate climatologies and compute
        msavi = msavi.resample(time='QS-DEC').mean('time')
        climatology_std = msavi.groupby('time.season').std('time').rename('masvi_std_climatology')
        climatology_std = climatology_std.to_dataset()
        climatology_std.to_zarr('results/masvi_climatology_std.nc')
    
#     return msavi#, climatology_mean


In [None]:
compute_climatology(ds, stat='mean')
compute()

In [None]:
msavi = xr.open_zarr('results/masvi_climatology_mean.nc')
msavi

In [None]:
x = xr.open_rasterio('msavi_2018_SON_anomalies.tif').squeeze()

In [None]:
x

In [None]:
x.plot(figsize=(15,15), vmin=-0.25, vmax=0.25, cmap='BrBG')

In [None]:
pwd

In [None]:
b.plot(x='x',y='y', col='time', col_wrap=2, vmin=-2.0,vmax=2.0,cmap='RdBu', figsize=(12,12))

In [None]:
b.mean(['x', 'y']).mean()#plot(figsize=(12,5))

In [None]:
# def compute_anomalies(data, output_dir):
#     msavi = xr.open_zarr('results/masvi.nc', chunks={'x': chunk_size, 'y': chunk_size})
#     #resample to quarterly and groupby seasons
#     msavi_seasonalMeans = msavi.resample(time='QS-DEC').mean('time')
#     msavi_seasonalMeans = msavi_seasonalMeans.groupby('time.season')
    
#     #import climatology
#     climatology_mean = xr.open_dataarray('results/masvi_climatology_mean_test.nc', chunks={'x': chunk_size, 'y': chunk_size})
#     climatology_std = xr.open_dataarray('results/masvi_climatology_std_test.nc', chunks={'x': chunk_size, 'y': chunk_size})
    
#     #calculate standardised anomalies
#     msavi_stand_anomalies = xr.apply_ufunc(lambda x, m, s: (x - m) / s,
#                                  msavi_seasonalMeans, climatology_mean, climatology_std,
#                                  dask='allowed')
    
#     #write out results (will compute now)
#     msavi_stand_anomalies.to_zarr(output_dir)
    
#     return msavi_stand_anomalies

### CUTS/ code for mosaicing results

In [None]:
import os
os.chdir('/g/data1a/r78/cb3058/dea-notebooks/vegetation_anomalies/results/northern_basins_SON/tiffs/')
os.system("gdalbuildvrt msavi_NMDB_climatology_SON_mosaic.vrt *.tif")
os.system("gdal_translate "\
   "-co BIGTIFF=YES "\
   "-co COMPRESS=DEFLATE "\
   "-co ZLEVEL=9 "\
   "-co PREDICTOR=1 "\
   "-co TILED=YES "\
   "-co BLOCKXSIZE=1024 "\
   "-co BLOCKYSIZE=1024 "\
   +"msavi_NMDB_climatology_SON_mosaic.vrt "+ "msavi_NMDB_climatology_SON_mosaic.tif")

In [None]:
import geopandas as gpd

In [None]:
os.chdir('/g/data1a/r78/cb3058/dea-notebooks/vegetation_anomalies/')

In [None]:
x = gpd.read_file("data/tiles_not_run_DJF_NWQLD.shp")

In [None]:
len(x.label)

In [None]:
a = []
for i in x.label:
    a.append(i)

In [None]:
np.setdiff1d(z,a)

In [None]:
import numpy as np

In [None]:
times = ['1988-12-01', '1989-12-01', '1990-12-01']

def timesListDCLoad(times, arrs)
    arrs = []
    query = {'lon': (lon - latLon_adjust, lon + latLon_adjust),
             'lat': (lat - latLon_adjust, lat + latLon_adjust),
            'time': times[0]}

    dc = datacube.Datacube(app='load_clearlandsat')
    ds1 = dc.load(query)
    arrs.append(ds1)
    
    for t in times[1:]:
        query = {'lon': (lon - latLon_adjust, lon + latLon_adjust),
             'lat': (lat - latLon_adjust, lat + latLon_adjust),
            'time': t}

        dc = datacube.Datacube(app='load_clearlandsat')
        ds = dc.load(query, like=ds1)
        arrs.append(ds)
        
    return arrs

In [None]:
def msavi_func(nir, red):
    return (2*nir+1-np.sqrt((2*nir+1)**2 - 8*(nir-red)))/2

def msavi_ufunc(ds):
    return xr.apply_ufunc(
        msavi_func, ds.nir, ds.red,
        dask='parallelized',
        output_dtypes=[float])

msavi = msavi_ufunc(ds_mo).compute()

In [None]:
climatology = msavi.groupby('time.month').mean('time')

anomalies = msavi.groupby('time.month') - climatology

In [None]:
    #Functions for weighting months to help with seasonal climatology
    dpm = {'noleap': [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           '365_day': [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           'standard': [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           'gregorian': [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           'proleptic_gregorian': [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           'all_leap': [0, 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           '366_day': [0, 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31],
           '360_day': [0, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30]}

    def leap_year(year, calendar='standard'):
        """Determine if year is a leap year"""
        leap = False
        if ((calendar in ['standard', 'gregorian',
            'proleptic_gregorian', 'julian']) and
            (year % 4 == 0)):
            leap = True
            if ((calendar == 'proleptic_gregorian') and
                (year % 100 == 0) and
                (year % 400 != 0)):
                leap = False
            elif ((calendar in ['standard', 'gregorian']) and
                     (year % 100 == 0) and (year % 400 != 0) and
                     (year < 1583)):
                leap = False
        return leap

    def get_dpm(time, calendar='standard'):
        """
        return a array of days per month corresponding to the months provided in `months`
        """
        month_length = np.zeros(len(time), dtype=np.int)

        cal_days = dpm[calendar]

        for i, (month, year) in enumerate(zip(time.month, time.year)):
            month_length[i] = cal_days[month]
            if leap_year(year, calendar=calendar):
                month_length[i] += 1
        return month_length

    def season_mean(ds, calendar='standard'):
        # Make a DataArray of season/year groups
        year_season = xr.DataArray(ds.time.to_index().to_period(freq='Q-NOV').to_timestamp(how='E'),
                                   coords=[ds.time], name='year_season')

        # Make a DataArray with the number of days in each month, size = len(time)
        month_length = xr.DataArray(get_dpm(ds.time.to_index(), calendar=calendar),
                                    coords=[ds.time], name='month_length')
        # Calculate the weights by grouping by 'time.season'
        weights = month_length.groupby('time.season') / month_length.groupby('time.season').sum()

        # Test that the sum of the weights for each season is 1.0
        np.testing.assert_allclose(weights.groupby('time.season').sum().values, np.ones(4))

        # Calculate the weighted average
        return (ds * weights).groupby('time.season').sum(dim='time')

    #calculate the seasonal climatology
#     msavi_seasonalClimatology = season_mean(msavi)

In [None]:
def load_chunked_nbarx(dc, sensor, query, product='nbart', bands_of_interest='', filter_pq=True, dask_chunks={}, like=None):
    
    product_name = '{}_{}_albers'.format(sensor, product)
    mask_product = '{}_{}_albers'.format(sensor, 'pq')
    
    if like is None:
        
        # If bands of interest are given, assign measurements in dc.load call
        if bands_of_interest:
            ds = dc.load(product=product_name, measurements=bands_of_interest,
                         group_by='solar_day', **query, dask_chunks=dask_chunks)
        # If no bands of interest given, run without specifying measurements
        else:
            ds = dc.load(product=product_name, group_by='solar_day', **query, dask_chunks=dask_chunks)

        # Proceed if the resulting call returns data
        if ds.variables:
            # If pixel quality filtering is enabled, extract PQ data to use as mask
            if filter_pq:
                sensor_pq = dc.load(product=mask_product, fuse_func=ga_pq_fuser,
                                    group_by='solar_day', **query, dask_chunks=dask_chunks)

                # If PQ call returns data, use to mask input data
                if sensor_pq.variables:
                    good_quality = masking.make_mask(sensor_pq.pixelquality,
                                                     cloud_acca='no_cloud',
                                                     cloud_shadow_acca='no_cloud_shadow',
                                                     cloud_shadow_fmask='no_cloud_shadow',
                                                     cloud_fmask='no_cloud',
                                                     blue_saturated=False,
                                                     green_saturated=False,
                                                     red_saturated=False,
                                                     nir_saturated=False,
                                                     swir1_saturated=False,
                                                     swir2_saturated=False,
                                                     contiguous=True)

                    # Apply mask to preserve only good data
                    ds = ds.where(good_quality)
            # Replace nodata values with nans
                ds = masking.mask_invalid_data(ds)
            return ds
        else:
            return None, None, None
    
    else:
        #If bands of interest are given, assign measurements in dc.load call
        if bands_of_interest:
            ds = dc.load(product=product_name, measurements=bands_of_interest,
                         group_by='solar_day',dask_chunks=dask_chunks, like=like)

        # If no bands of interest given, run without specifying measurements
        else:
            ds = dc.load(product=product_name, group_by='solar_day', dask_chunks=dask_chunks, like=like)

        # Proceed if the resulting call returns data
        if ds.variables:
            # If pixel quality filtering is enabled, extract PQ data to use as mask
            if filter_pq:
                sensor_pq = dc.load(product=mask_product, fuse_func=ga_pq_fuser,
                                    group_by='solar_day', dask_chunks=dask_chunks, like=like)

                # If PQ call returns data, use to mask input data
                if sensor_pq.variables:
                    good_quality = masking.make_mask(sensor_pq.pixelquality,
                                                     cloud_acca='no_cloud',
                                                     cloud_shadow_acca='no_cloud_shadow',
                                                     cloud_shadow_fmask='no_cloud_shadow',
                                                     cloud_fmask='no_cloud',
                                                     blue_saturated=False,
                                                     green_saturated=False,
                                                     red_saturated=False,
                                                     nir_saturated=False,
                                                     swir1_saturated=False,
                                                     swir2_saturated=False,
                                                     contiguous=True)

                    # Apply mask to preserve only good data
                    ds = ds.where(good_quality)
            # Replace nodata values with nans
                ds = masking.mask_invalid_data(ds)
            return ds
        else:
            return None, None, None

def load_aligned_nbarx(dc, query, sensors, product='nbart', bands_of_interest='', filter_pq=True, dask_chunks={}):
    print('loading ' + sensors[0])
    
    #list for adding loaded data too
    filtered_sensors = []
    
    #load the first sensor data and append to list
    ds1 = load_chunked_nbarx(dc, query, sensor=sensors[0], product=product, bands_of_interest=bands_of_interest,
                             filter_pq=filter_pq, dask_chunks=dask_chunks, like=None)
    
    filtered_sensors.append(ds1)
    
    #now load the other sensors using the first sensor as the 'like' parameter and append
    for sensor in sensors[1:]:
        print("\r", 'loading sensor ' + sensor, end='')
        ds = load_chunked_nbarx(dc,sensor=sensors[0], product=product, bands_of_interest=bands_of_interest,
                                filter_pq=filter_pq, dask_chunks=dask_chunks, like=ds1)
        filtered_sensors.append(ds1)
    
    # Concatenate all sensors into one big xarray dataset, and then sort by time 
    print(', concatenating & sorting sensors')
    combined_ds = xr.concat(filtered_sensors, dim='time')
    combined_ds = combined_ds.sortby('time')                                                               
    # Return combined dataset
    return combined_ds

In [None]:
query = {'lon': (lon - latLon_adjust, lon + latLon_adjust),
         'lat': (lat - latLon_adjust, lat + latLon_adjust),
        'time': (start, end)}

# query = query_from_shp.query_from_shp(shp_fpath, start, end)
dc = datacube.Datacube(app='load_clearlandsat')
ds = load_aligned_nbarx(dc=dc, sensors=['ls7', 'ls8'], query=query, bands_of_interest=['nir', 'red'], 
                        filter_pq=True, dask_chunks = {'x': chunk_size, 'y': chunk_size})
ds