In [4]:
from pyEDM import *
import pandas as pd
import numpy as np
import matplotlib as mpl
from matplotlib import pyplot as plt
import os
import geopandas as gpd
from shapely.geometry import Polygon
import rasterio
import xarray as xr
from mpl_toolkits.axes_grid1 import make_axes_locatable
import fiona
import rasterio.mask
from descartes import PolygonPatch
from rasterio.plot import show
from tqdm import tqdm
import datetime
import random
from sklearn.preprocessing import StandardScaler
import scipy.stats
from datetime import datetime

In [34]:
from dask.distributed import Client

client = Client(n_workers=4)

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


In [33]:
from dask import delayed

In [5]:
precip_anom_dir = '/home/rgreen/tana-spin/rgreen/DroughtEDM/Data/CHIRPS/Anomaly/'
temp_anom_dir = '/home/rgreen/tana-spin/rgreen/DroughtEDM/Data/LST/Anomaly/'
ndvi_anom_dir = '/home/rgreen/tana-spin/rgreen/DroughtEDM/Data/eMODIS_NDVI/Anomaly/'

In [40]:
prediction_tables = '/home/rgreen/tana-spin/rgreen/DroughtEDM/Data/predictions_tables/'

In [38]:
data_volumes = '/home/rgreen/tana-spin/rgreen/DroughtEDM/Data/data_volumes/'

In [7]:
def generate_dekads(start, end):
    '''
    Generates list of timestamps of dekads between two dates
    
    Parameters
    ----------
    start : pd.datetime object
        Start date of list
    end : pd.datetime object
        end date of list
    '''
    
    dtrange = pd.date_range(start, end)
    days = list(range(len(dtrange))) #length of dtrange
    daysDF = pd.DataFrame({'Days': days}, index=dtrange)
    d = daysDF.index.day - np.clip((daysDF.index.day-1) // 10, 0, 2)*10 - 1 
    dekaddates = daysDF.index.values - np.array(d, dtype="timedelta64[D]")
    dekads = daysDF.groupby(dekaddates).mean()
    dekads = dekads.index
    
    return dekads

In [8]:
start = pd.datetime(2002,7,1)
end = pd.datetime(2019,4,30)

dekads = generate_dekads(start, end)

In [9]:
def pixelwise_ts_table(in_dir, keyword):
    
    '''
    This function searches through the file directory and creates a dataframe of pixel values over time from rasters
    based on a given keyword describing the environmental variable of interest to generate a raster time series stack 
    
   In the output dataframe, columns are each pixel extracted from the raster (read squentially into a list)
   and rows are the same pixel over time (dekads going down)
    
    Args:
    - in_dir: path to the input directory
    -keyword: string that is unique to the environmental variable
        options: ['precip', 'temp', 'ndvi']
    '''
    
    
    files=np.array(sorted(os.listdir(in_dir)))
    tifs = pd.Series(files).str.contains(keyword)
    files = files[tifs]
        
    pixelwise_TS = []
    
    for filename in tqdm(files): 
        
        
            open_file = xr.open_rasterio(in_dir+filename).sel(band=1)
            array = open_file.values
            
            pixel_list = array.ravel().tolist()
            
            pixelwise_TS.append(pixel_list)

    return pd.DataFrame(pixelwise_TS)

    

In [10]:
precip_table_anom = pixelwise_ts_table(precip_anom_dir, 'precip')
temp_table_anom = pixelwise_ts_table(temp_anom_dir, 'temp')
ndvi_table_anom = pixelwise_ts_table(ndvi_anom_dir, 'ndvi')

100%|██████████| 606/606 [00:09<00:00, 62.31it/s]
100%|██████████| 606/606 [00:09<00:00, 64.52it/s]
100%|██████████| 606/606 [00:08<00:00, 67.86it/s]


In [11]:
precip_table_anom.label = 'precip'
precip_table_anom.rs_rows = 674
precip_table_anom.rs_cols = 583
precip_table_anom.n_samples = 606
ndvi_table_anom.label = 'ndvi'
ndvi_table_anom.rs_rows = 674
ndvi_table_anom.rs_cols = 583
ndvi_table_anom.n_samples = 606
temp_table_anom.label = 'temp'
temp_table_anom.rs_rows = 674
temp_table_anom.rs_cols = 583
temp_table_anom.n_samples = 606

table_list_anom = [precip_table_anom, ndvi_table_anom, temp_table_anom]

In [16]:
def multi_pixelwise_simplex(tables, target_label, train, test, ed, pi): #year month_list, 
    
    '''
    This function computes the simplex prediction skill for a given embedding dimensions, prediction interval, land cover and environmental variable
    across pixels given multiple tables of different variables of pixel values across time for a region from the function pixelwise_ts_table 

    The result is an np.array of simplex error values reshaped back into the shape of the original raster image
    
    Args:
    - table: list of input tables, where every table has columns with the time series of each pixel in a raster
    - target_label: text label for the table to be used as the target (e.g. 'ndvi')
    - train: portion of data library used to train (string of range with space between values)
    - test: portion of data libary used to test (string of range with space between values) 
    - ed: embedding dimension
    - pi: prediction interval (Tp) 
    - rs_rows: number of rows to reshape list into original raster size
    - rs_cols: number of columns to reshape list into original raster size
   
    '''

    table_dict = {'Time': dekads }
    
#     if simplex_out is None:
#         simplex_out = pd.DataFrame()
    simplex_out= []
    labels = ' '.join([table.label for table in tables]) # Assumes that all tables have labels

    # Generate future dekads
    dekads_fut = generate_dekads(dekads[-1],pd.datetime(2020,4,30))
    
   
    
    # Iterate through columns (pixels)
    for col_idx in tqdm(range(len(tables[0].columns))):
        
        now=datetime.now()
        # PIXEL DATAFRAME
        # Generate dataframe for pixel where columns correspond to variables and rows to timestamp (image)
        [table_dict.update({table.label: table.iloc[:,col_idx]}) for i, table in enumerate(tables)]
        #interim_df = pd.DataFrame(table_dict, index = dekads)
        interim_df = pd.DataFrame(table_dict)

        interim_df.set_index('Time',inplace=True, drop=False)
        
        
                # If column has lots of nans (water), append nan
        if max([len(interim_df[interim_df[col_idx].isnull()]) for col_idx in interim_df.columns]) > 20:
            nans = [np.nan]*(len(interim_df)-ed+pi+1)
            #simplex_out[col_idx] = [np.nan]*(len(interim_df)-ed+pi+1)
            simplex_out.append(nans)
        else:

            # Check whether column has a few nans
            if interim_df.temp.isnull().values.any():
                # If so, iterate through nan rows and replace with monthly mean
                for t in interim_df[interim_df.temp.isnull()].index:
                    interim_df.temp.loc[t] = interim_df.temp[interim_df.index.month == t.month].mean()
                    #print(interim_df)
            if interim_df.ndvi.isnull().values.any():
            # If so, iterate through nan rows and replace with monthly mean
                for t in interim_df[interim_df.ndvi.isnull()].index:
                    interim_df.ndvi.loc[t] = interim_df.ndvi[interim_df.index.month == t.month].mean()
            if interim_df.precip.isnull().values.any():
            # If so, iterate through nan rows and replace with monthly mean
                for t in interim_df[interim_df.precip.isnull()].index:
                    interim_df.precip.loc[t] = interim_df.precip[interim_df.index.month == t.month].mean()
            #print(interim_df)
            
            # Run simplex
            
            simplex_result = Simplex(dataFrame = interim_df,
                                     lib = train,
                                     pred = test,
                                     E = ed, Tp = pi,
                                     columns = labels, target = target_label, showPlot = False)

            
            
            # Add datetime index + fix dekad values for future values
            simplex_result.Time = pd.to_datetime(simplex_result.Time)
            simplex_result.set_index(simplex_result.Time, inplace=True)
            
            
            # Fix datetime index for future dekads
            simplex_result[-pi:].Time = dekads_fut[1:pi+1]
            
            #simplex_result.dropna()
            #simplex_out[col_idx] = simplex_result.Predictions
            #print(simplex_result.Predictions.tolist())
            
            predictions = simplex_result.Predictions.tolist()
            simplex_out.append(predictions)
            
            #print(simplex_out[i])
            delta=datetime.now()-now
            #print(delta)
    return pd.DataFrame(simplex_out).T
    


In [None]:
ndvi_ea_simplex_ed6_pi1 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 1)

In [None]:
ndvi_ea_simplex_ed6_pi2 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 2)

In [22]:
ndvi_ea_simplex_ed6_pi3 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 3)

100%|██████████| 392942/392942 [4:39:29<00:00, 23.43it/s]   


In [24]:
ndvi_ea_simplex_ed6_pi4 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 4)

100%|██████████| 392942/392942 [4:40:31<00:00, 23.35it/s]   


In [None]:
ndvi_ea_simplex_ed6_pi5 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 5)

 18%|█▊        | 69965/392942 [57:48<4:48:55, 18.63it/s]IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

 96%|█████████▋| 378938/392942 [4:42:39<02:12, 105.97it/s]  

In [29]:
ndvi_ea_simplex_ed6_pi6 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 6)

  5%|▍         | 19046/392942 [16:16<4:56:46, 21.00it/s]IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

 70%|███████   | 275447/392942 [3:43:18<1:49:22, 17.90it/s] IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

 82%|████████▏ | 320363/392942 [4:09:32<1:04:40, 18.70it/s]IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variabl

In [None]:
ndvi_ea_simplex_ed6_pi7 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 7)

 10%|█         | 39833/392942 [32:46<5:01:09, 19.54it/s]

In [None]:
ndvi_ea_simplex_ed6_pi8 = multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 8)

In [36]:
ndvi_ea_simplex_ed6_pi7

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,392932,392933,392934,392935,392936,392937,392938,392939,392940,392941
0,,,,,,,,,,,...,,,,,,,,,,
1,,,,,,,,,,,...,,,,,,,,,,
2,,,,,,,,,,,...,,,,,,,,,,
3,,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
603,0.639055,-0.103220,0.260122,0.105711,-0.142640,0.274273,0.296313,0.440756,0.332630,0.158548,...,,,,,,,,,,
604,0.446178,-0.015378,0.395048,0.130652,0.161820,-0.074273,0.283897,0.362719,0.284274,0.074657,...,,,,,,,,,,
605,0.348569,0.162003,0.456837,0.262839,0.232147,0.237869,0.243394,0.380784,0.407004,0.444772,...,,,,,,,,,,
606,0.449923,0.379533,0.499245,0.442150,0.423183,0.399949,0.233216,0.070346,0.227777,0.823023,...,,,,,,,,,,


In [21]:
ndvi_ea_simplex_ed6_pi2.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi2.csv')

In [23]:
ndvi_ea_simplex_ed6_pi3.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi3.csv')

In [25]:
ndvi_ea_simplex_ed6_pi4.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi4.csv')

In [28]:
ndvi_ea_simplex_ed6_pi5.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi5.csv')

In [31]:
ndvi_ea_simplex_ed6_pi6.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi6.csv')

In [None]:
ndvi_ea_simplex_ed6_pi7.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi7.csv')

In [None]:
ndvi_ea_simplex_ed6_pi8.to_csv(prediction_tables + 'ndvi_ea_simplex_ed6_pi8.csv')

In [None]:
simplex_df = pd.DataFrame()
num_col_in_chunk = 1000
for starting_col in range(0, 392942, num_col_in_chunk):#392942
    ndvi_ea_simplex= multi_pixelwise_simplex(table_list_anom, 'ndvi', train = '1 606', test = '1 606',ed = 6, pi = 1, starting_column = starting_col, num_columns = num_col_in_chunk, simplex_out=simplex_df)
    new_ndvi_ea_simplex = ndvi_ea_simplex.iloc[:,-num_col_in_chunk:]
    new_ndvi_ea_simplex.to_csv(data_volumes+'ndvi_ea_simplex_ed6_pi1_0' + str(starting_col) + '.csv')

In [12]:
df = DataFrame(np.random.randn(300000,600))

In [13]:
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,590,591,592,593,594,595,596,597,598,599
0,0.089729,0.453758,-0.227077,1.086760,0.429158,-0.267277,-0.586962,-0.291149,0.610582,-0.677323,...,0.954878,0.041511,-1.375701,-0.988672,1.035983,-1.124558,-0.239706,1.111770,0.625992,-0.826514
1,-2.356091,1.201864,0.747935,-0.620006,-0.803996,0.650603,0.854943,0.640250,-0.750479,0.867959,...,0.426309,2.774478,-0.953031,-0.399365,-0.363743,1.257300,1.981312,-0.435598,0.069492,0.832684
2,2.593216,0.897086,0.542531,0.407485,1.143427,0.145874,-1.135798,0.319879,-1.022585,-1.866101,...,1.302528,1.080571,-1.013649,-0.054470,-0.528769,0.980161,-0.199724,-0.105143,-1.204524,-0.476064
3,0.853982,-0.469263,-1.182209,0.437694,-1.529014,-1.468944,0.424029,-1.374118,-0.226055,2.710123,...,1.580888,-0.438513,-0.203751,-0.157685,1.415128,0.349609,0.349413,-2.277077,-0.177368,-0.863924
4,-0.028378,-0.124003,0.459031,0.986843,-1.041047,-0.589072,-1.330099,-0.115318,-0.552186,0.303430,...,0.694026,-2.879508,-0.531474,0.605278,0.402023,2.876967,-0.567868,0.123081,0.086449,-0.046321
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
299995,-1.692810,0.888814,-0.694874,1.251194,-1.756056,0.344140,0.427033,1.722951,-0.954262,1.133018,...,-0.838315,0.991573,0.358765,-0.249567,0.945794,1.935669,0.569591,0.133618,-0.790139,-1.621846
299996,1.396363,-0.244527,-1.287964,0.722180,0.403257,0.161471,0.387789,-0.516893,0.069343,0.483551,...,-0.174783,-2.105806,-0.358293,1.096476,-0.070699,1.501764,0.481007,1.599794,0.442363,-0.594189
299997,2.112641,1.274941,-1.335623,-0.524763,-0.606644,-0.333212,1.466827,1.328934,-0.218637,-0.170698,...,0.896251,-0.292122,0.942140,-1.558474,-0.535838,-0.536876,1.004867,0.184847,-0.154542,-1.700770
299998,-1.199137,0.551833,0.757334,-1.308611,-1.778348,0.226848,2.733152,1.054062,0.190829,-1.163540,...,1.363647,-0.470022,-0.325498,0.113593,1.652439,-0.808996,-0.031348,0.231624,1.744659,0.006439


In [15]:
def f2():
    result = []
    for i in range(10):
        result.append(df)
    return pd.concat(result)

In [18]:
from dask.distributed import Client

client = Client(n_workers=4)

In [22]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [23]:
data = [1, 2, 3, 4, 5, 6, 7, 8]


In [24]:
%%time 
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

CPU times: user 200 ms, sys: 92 ms, total: 292 ms
Wall time: 8.01 s


In [25]:
total

44

In [26]:
from dask import delayed

In [27]:
# This runs immediately, all it does is build a graph

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

In [28]:
%%time
# This actually runs our computation using a local thread pool

z.compute()

CPU times: user 48 ms, sys: 40 ms, total: 88 ms
Wall time: 2.02 s


5

In [32]:
z.compute()

5