In [1]:
#!/usr/bin/env python
'''File name: ExtremeEvent-WeatherTyping.py
    Author: Andreas Prein
    E-mail: prein@ucar.edu
    Date created: 16.04.2018
    Date last modified: 16.04.2018

    ############################################################## 
    Purpos:
    Classifies weather patterns that cause precipitation extremes

    1) read in shape file for area under cosideration

    2) read in precipitation data from PRISM

    3) identify the N-days that had highest rainfall records

    4) read in ERA-Interim data for these days

    5) remove the 30-year mooving average from time series and
       normalize the variables

    5) run clustering algorithm on extreme WT patterns

    6) search for the extreme WT centroids in the full record


'''

'File name: ExtremeEvent-WeatherTyping.py\n    Author: Andreas Prein\n    E-mail: prein@ucar.edu\n    Date created: 16.04.2018\n    Date last modified: 16.04.2018\n\n    ############################################################## \n    Purpos:\n    Classifies weather patterns that cause precipitation extremes\n\n    1) read in shape file for area under cosideration\n\n    2) read in precipitation data from PRISM\n\n    3) identify the N-days that had highest rainfall records\n\n    4) read in ERA-Interim data for these days\n\n    5) remove the 30-year mooving average from time series and\n       normalize the variables\n\n    5) run clustering algorithm on extreme WT patterns\n\n    6) search for the extreme WT centroids in the full record\n\n\n'

In [2]:
import matplotlib
gui_env = ['TKAgg','GTKAgg','Qt4Agg','WXAgg']
for gui in gui_env:
    try:
        print( "testing", gui)
        matplotlib.use(gui,warn=False, force=True)
        from matplotlib import pyplot as plt
        break
    except:
        continue
print( "Using:",matplotlib.get_backend())


from dateutil import rrule
import datetime
import glob
from netCDF4 import Dataset
import sys, traceback
import dateutil.parser as dparser
import string
from pdb import set_trace as stop
import numpy as np
import numpy.ma as ma
import os
from mpl_toolkits import basemap
import pickle
import subprocess
import pandas as pd
from scipy import stats
import copy
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import matplotlib as mpl
import pylab as plt
import random
import scipy.ndimage as ndimage
import scipy
import shapefile
import matplotlib.path as mplPath
from matplotlib.patches import Polygon as Polygon2
# Cluster specific modules
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.cluster.hierarchy import cophenet
from scipy.spatial.distance import pdist
from scipy.cluster.hierarchy import fcluster
from scipy.cluster.vq import kmeans2,vq, whiten
from scipy.ndimage import gaussian_filter
import seaborn as sns
# import metpy.calc as mpcalc
import shapefile as shp
import sys
from itertools import combinations 

from Functions_Extreme_WTs import XWT
from Functions_Extreme_WTs import MRR, MRD, perkins_skill

import warnings
warnings.filterwarnings("ignore")

import time
import dask
from dask.distributed import Client, progress
from dask import delayed

testing TKAgg


  


Using: TkAgg


In [3]:
# ###################################################
# This information comes from the setup file

from XWTs_search_ERA5 import rgdTime, iMonths, sPlotDir, sDataDir, sSubregionPR, rgsWTvars, VarsFullName,rgsWTfolders, rgrNrOfExtremes, WT_Domains, DomDegreeAdd, Annual_Cycle, SpatialSmoothing, Metrics, Dimensions, DW_Regions, FireObs
# from XWTs_search_ERA5_preselect import rgdTime, iMonths, sPlotDir, sDataDir, sSubregionPR, rgsWTvars, VarsFullName,rgsWTfolders, rgrNrOfExtremes, WT_Domains, DomDegreeAdd, Annual_Cycle, SpatialSmoothing, Metrics, Dimensions, DW_Regions, FireObs


rgsWTfolders=[rgsWTfolders[0]+str(rgsWTvars[va])+'/' for va in range(len(rgsWTvars))]

# create all possible combinations of variables
iVariables=range(len(VarsFullName))
Combinations1=np.array(list(combinations(iVariables, 1)))
Combinations2=np.squeeze(np.array(list(combinations(iVariables, 2))))
Combinations3=np.squeeze(np.array(list(combinations(iVariables, 3))))
Combinations4=np.squeeze(np.array(list(combinations(iVariables, 4))))
Combinations=list(Combinations1)+list(Combinations2)+list(Combinations3) #+list(Combinations4)
print('Number of var. combinations '+str(len(Combinations)))

print('Number of arguments:', len(sys.argv), 'arguments.')
iRegion=1 #int(sys.argv[1])
sRegion=DW_Regions[iRegion]
sSubregionPR=sSubregionPR+sRegion
print('---- Process Region '+sRegion+' ----')

# create nessesary directories
if not os.path.exists(sDataDir):
    os.makedirs(sDataDir)
if not os.path.exists(sPlotDir):
    os.makedirs(sPlotDir)
sRegion=sRegion.replace('/','-')

ss='-'
sMonths=ss.join([str(iMonths[ii]) for ii in range(len(iMonths))])
print('    Months: '+sMonths)

# ###################################################
# use setup to generate data
rgiYears=np.unique(rgdTime.year)
rgdTimeMM = pd.date_range(datetime.datetime(rgdTime.year[0], rgdTime.month[0], rgdTime.day[0],0), end=datetime.datetime(rgdTime.year[-1], rgdTime.month[-1], rgdTime.day[-1],0), freq='m')
AllMonths=rgdTime
YYYY_stamp=str(rgdTime.year[0])+'-'+str(rgdTime.year[-1])
rgiSeasonWT=np.isin(rgdTime.month, iMonths)
rgdTime=rgdTime[rgiSeasonWT]

SPLIT=np.where(rgdTime.year < rgiYears[int(np.round(len(rgiYears)/2))])[0][-1]
    
SkillScores_All=np.zeros((len(Combinations),len(rgrNrOfExtremes),len(WT_Domains),len(Annual_Cycle),len(SpatialSmoothing),2,len(Metrics))); SkillScores_All[:]=np.nan

Number of var. combinations 6017
Number of arguments: 3 arguments.
---- Process Region Central_Coast ----
    Months: 1-2-3-4-5-6-7-8-9-10-11-12


In [4]:
### CHECK IF DATA IS ALREADY PROCESSED ###
SaveStats=sDataDir+sRegion+'_'+YYYY_stamp+'-'+sMonths+'_'+FireObs+'_ERA5.npz'
if os.path.isfile(SaveStats) == 0:
    # ###################################################
    print('    Read the daily Fire data')
    
    
    ncid=Dataset('/glade/campaign/mmm/c3we/prein/observations/Fire-observations/Fire_GWIS/data_20020702/GriddedData/MODIS_BA_GLOBAL_1_2016_01_gridded.nc', mode='r') # open the netcdf file
    rgrLatPR=np.squeeze(ncid.variables['rlat'][:])
    rgrLonPR=np.squeeze(ncid.variables['rlon'][:])
    ncid.close()
    rgrGridCells=[(rgrLonPR.ravel()[ii],rgrLatPR.ravel()[ii]) for ii in range(len(rgrLonPR.ravel()))]
    rgrSRactP=np.zeros((rgrLonPR.shape[0]*rgrLonPR.shape[1]))
    sf = shp.Reader(sSubregionPR)
    from Functions_Extreme_WTs import read_shapefile
    df = read_shapefile(sf)
    for sf in range(df.shape[0]):
        ctr = df['coords'][sf]
        if len(ctr) > 10000:
            ctr=np.array(ctr)[::100,:] # carsen the shapefile accuracy
        else:
            ctr=np.array(ctr)
        grPRregion=mplPath.Path(ctr)
        TMP=np.array(grPRregion.contains_points(rgrGridCells))
        rgrSRactP[TMP == 1]=1

    rgrSRactP=np.reshape(rgrSRactP, (rgrLatPR.shape[0], rgrLatPR.shape[1]))
    rgiSrPR=np.array(np.where(rgrSRactP == True))
    iLatMaxP=rgiSrPR[0,:].max()+1
    iLatMinP=rgiSrPR[0,:].min()
    iLonMaxP=rgiSrPR[1,:].max()+1
    iLonMinP=rgiSrPR[1,:].min()
    rgrPRdata=np.zeros((sum(rgiSeasonWT),iLatMaxP-iLatMinP,iLonMaxP-iLonMinP))
        
    if FireObs == 'MODIS':
        for mo in range(len(rgdTimeMM)):
            rgiDD=np.where(((rgdTimeMM.year[mo] == rgdTime.year) & (rgdTime.month == rgdTimeMM.month[mo]) & np.isin(rgdTimeMM.month[mo], iMonths)))[0]
            if len(rgiDD) != 0:
                ncid=Dataset('/glade/campaign/mmm/c3we/prein/observations/Fire-observations/Fire_GWIS/data_20020702/GriddedData/MODIS_BA_GLOBAL_1_'+str(rgdTimeMM.year[mo])+'_'+str("%02d" % rgdTimeMM.month[mo])+'_gridded.nc', mode='r')
                rgrPRdata[rgiDD,:,:]=np.squeeze(ncid.variables['BurnedArea'][:,iLatMinP:iLatMaxP,iLonMinP:iLonMaxP])
                ncid.close()
        
        rgrPRdata[rgrPRdata<0] = np.nan
        rgiSRgridcells=rgrSRactP[iLatMinP:iLatMaxP,iLonMinP:iLonMaxP].astype('int')
        rgrPRdata=np.nanmean(rgrPRdata[:,(rgiSRgridcells==1)], axis=(1))

    elif FireObs == 'Parks':
        SheanDataDir='/glade/campaign/mmm/c3we/prein/observations/Fire-observations/Sean_Parks/SubRegion_DailyBurnedArea/'
        FILE=SheanDataDir+DW_Regions[iRegion]+'.txt'
        rgrPRrecords=np.array(pd.read_csv(FILE, delim_whitespace=True, header = None))[:,0]
        
        # rgiExtrTrain=ExtremeDays(rgrPRrecords,iNrOfExtremes,MinDistDD)
        # rgiExtremeDays=rgdTime[rgiExtrTrain]
        rgrPRdata=np.copy(rgrPRrecords)
        rgrPRdata[rgrPRdata<0] = np.nan

    print( '    Read the ERA-Interim data')
    # We read in ERA-Interim data for the largest region and cut it to fit smaller regions
    DomDelta=np.max(DomDegreeAdd)
    Wlon=ctr[:,0].min()
    Elon=ctr[:,0].max()
    Nlat=ctr[:,1].max()
    Slat=ctr[:,1].min()
    DomainWT=np.array([[Elon+DomDelta,Slat-DomDelta],
                       [Wlon-DomDelta,Slat-DomDelta],
                       [Wlon-DomDelta,Nlat+DomDelta],
                       [Elon+DomDelta,Nlat+DomDelta],
                       [Elon+DomDelta,Slat-DomDelta]])
    grWTregion=mplPath.Path(DomainWT)

    # ###################################################
    #         Read the ERA-Interim grid and data
    from Functions_Extreme_WTs import ReadERA5
    DailyVarsLargeDom=ReadERA5(grWTregion,      # shapefile with WTing region
                       rgdTime,         # time period for WTing
                       iMonths,         # list of months that should be considered
                       rgsWTfolders,    # directories containing WT files
                       rgsWTvars)       # netcdf variable names of WT variables

    # for CAPE and CIN replace NaN with 0
    VARSact=['CAPE','CIN']
    for vv in range(len(VARSact)):
        try:
            IND=VarsFullName.index(VARSact[vv])
            TMP=DailyVarsLargeDom[0][:,:,:,IND]; TMP[np.isnan(TMP)]=0
            DailyVarsLargeDom[0][:,:,:,IND]=TMP
        except:
            pass
    # ###################################################
    print('    Read the ERA-Interim data specific for the region')
    
    for re in range(len(WT_Domains)):
        print( '    ------')
        print( '    Domain '+WT_Domains[re])
        DeltaX=np.max(DomDegreeAdd)-DomDegreeAdd[re]
        if DeltaX != 0:
            DomainWT=np.array([[Elon+DomDegreeAdd[re],Slat-DomDegreeAdd[re]],
                       [Wlon-DomDegreeAdd[re],Slat-DomDegreeAdd[re]],
                       [Wlon-DomDegreeAdd[re],Nlat+DomDegreeAdd[re]],
                       [Elon+DomDegreeAdd[re],Nlat+DomDegreeAdd[re]],
                       [Elon+DomDegreeAdd[re],Slat-DomDegreeAdd[re]]])

            grWTregion=mplPath.Path(DomainWT)
            rgrGridCells=[(DailyVarsLargeDom[1].ravel()[ii],DailyVarsLargeDom[2].ravel()[ii]) for ii in range(len(DailyVarsLargeDom[1].ravel()))]
            rgrSRact=np.array(grWTregion.contains_points(rgrGridCells)); rgrSRact=np.reshape(rgrSRact, (DailyVarsLargeDom[1].shape[0], DailyVarsLargeDom[1].shape[1]))
            rgiSrWT=np.array(np.where(rgrSRact == True))
            iLatMax=rgiSrWT[0,:].max()
            iLatMin=rgiSrWT[0,:].min()
            iLonMax=rgiSrWT[1,:].max()
            iLonMin=rgiSrWT[1,:].min()

            DailyVars=DailyVarsLargeDom[0][:,iLatMin:iLatMax,iLonMin:iLonMax,:]
        else:
            DailyVars=DailyVarsLargeDom[0]
        # perform split sample statistic
        for ss in range(2):
            print('    Split Sample Nr. '+str(ss+1))
            if ss == 0:
                DailyVarsTrain=DailyVars[:SPLIT,:]
                DailyVarsEval=DailyVars[-SPLIT:,:]
                Ptrain=rgrPRdata[:SPLIT]
                Peval=rgrPRdata[-SPLIT:]
                TimeTrain=rgdTime[:SPLIT]
                TimeEval=rgdTime[-SPLIT:]
            else:
                DailyVarsTrain=DailyVars[-SPLIT:,:]
                DailyVarsEval=DailyVars[:SPLIT,:]
                Ptrain=rgrPRdata[-SPLIT:]
                Peval=rgrPRdata[:SPLIT]
                TimeTrain=rgdTime[-SPLIT:]
                TimeEval=rgdTime[:SPLIT]

            for ne in range(len(rgrNrOfExtremes)):
                DailyVarsAct=np.copy(DailyVarsTrain)
                print( '        '+str(rgrNrOfExtremes[ne])+' EXTREMES')
                iNrOfExtremes=rgrNrOfExtremes[ne]   # we consider the N highest rainfall extremes
                rgiSRgridcells=rgrSRactP[iLatMinP:iLatMaxP,iLonMinP:iLonMaxP].astype('int')
                rgrPRrecords=Ptrain
                rgrPReval=Peval
    
                # Test effect of spatial smoothing
                for sm in range(len(SpatialSmoothing)):
                    # annual cycle treatment
                    for ac in range(len(Annual_Cycle)):
                        print( '            Loop over variable permutations')
                        stop()
                        for va1 in range(len(Combinations)):
                            print('    process var '+str(va1)+' of '+str(len(Combinations)))
                            XWT_output=XWT(DailyVarsTrain[:,:,:,Combinations[va1]],
                                           DailyVarsEval[:,:,:,Combinations[va1]],
                                           rgrPRrecords,
                                           rgrPReval,
                                           TimeTrain,
                                           TimeEval,
                                           rgrNrOfExtremes[ne],
                                           SpatialSmoothing[sm],
                                           ClusterMeth='hdbscan')
                            if XWT_output != None:
                                SkillScores_All[va1, ne, re, ac, sm, ss, Metrics.index('PSS')]=XWT_output['grPSS'] # Perkins Skill Score
                                SkillScores_All[va1, ne, re, ac, sm, ss, Metrics.index('MRD')]=XWT_output['grMRD'] # Mean relative difference
                                SkillScores_All[va1, ne, re, ac, sm, ss, Metrics.index('MRR')]=XWT_output['grMRR'] # Mean Rank Ratio
                                SkillScores_All[va1, ne, re, ac, sm, ss, Metrics.index('APR')]=XWT_output['APR'] # Average precision-recall score
                                SkillScores_All[va1, ne, re, ac, sm, ss, Metrics.index('PEX')]=XWT_output['PEX'] # Percent of points excluded for ED larger than the 75 percentile
                                SkillScores_All[va1, ne, re, ac, sm, ss, Metrics.index('AUC')]=XWT_output['AUC'] # Area under the ROC curve
                            
                print( ' ')
    np.savez(SaveStats,
             SkillScores_All=SkillScores_All, 
             Combinations=Combinations, 
             rgsWTvars=VarsFullName,
             rgrNrOfExtremes=rgrNrOfExtremes,
             WT_Domains=WT_Domains,
             Annual_Cycle=Annual_Cycle,
             SpatialSmoothing=SpatialSmoothing,
             Metrics=Metrics,
             Dimensions=Dimensions)
else:
    print('    Load: '+SaveStats)
    DATA=np.load(SaveStats)
    SkillScores_All=DATA['SkillScores_All']
    Combinations=DATA['Combinations']
    VarsFullName=DATA['rgsWTvars']
    rgrNrOfExtremes=DATA['rgrNrOfExtremes']
    WT_Domains=DATA['WT_Domains']
    Annual_Cycle=DATA['Annual_Cycle']
    SpatialSmoothing=DATA['SpatialSmoothing']
    Metrics=DATA['Metrics']
    Dimensions=DATA['Dimensions']

    Read the daily Fire data
    Read the ERA-Interim data
        Read ERA-5 year: 2001
        Read ERA-5 year: 2002
        Read ERA-5 year: 2003
        Read ERA-5 year: 2004
        Read ERA-5 year: 2005
        Read ERA-5 year: 2006
        Read ERA-5 year: 2007
        Read ERA-5 year: 2008
        Read ERA-5 year: 2009
        Read ERA-5 year: 2010
        Read ERA-5 year: 2011
        Read ERA-5 year: 2012
        Read ERA-5 year: 2013
        Read ERA-5 year: 2014
        Read ERA-5 year: 2015
        Read ERA-5 year: 2016
        Read ERA-5 year: 2017
        Read ERA-5 year: 2018
        Read ERA-5 year: 2019
    Read the ERA-Interim data specific for the region
    ------
    Domain S
    Split Sample Nr. 1
        4 EXTREMES
            Loop over variable permutations
> <ipython-input-4-3874470b6e79>(147)<module>()
-> for va1 in range(len(Combinations)):


(Pdb)  exit


BdbQuit: 

In [5]:
from ncar_jobqueue import NCARCluster
from dask.distributed import Client

ModuleNotFoundError: No module named 'ncar_jobqueue'

In [None]:
# Casper
NUMNODES = 3
num_processes = 9
num_threads = 18
MEM = "200GB"

cluster = NCARCluster(cores=num_threads, 
                      processes=num_processes, 
                      memory=MEM, walltime="02:00:00", 
                      project="p66770001")

In [None]:
cluster.scale(NUMNODES * num_processes)

In [None]:
client = Client(cluster)

In [None]:
cluster

In [None]:
client

In [108]:
# client = Client(threads_per_worker=1, n_workers=4)

from distributed import LocalCluster, Client
client = Client(LocalCluster(processes=True, n_workers=10, threads_per_worker=10))
client

0,1
Client  Scheduler: tcp://127.0.0.1:38409  Dashboard: http://127.0.0.1:45783/status,Cluster  Workers: 10  Cores: 100  Memory: 322.12 GB


In [109]:
%%time
results = []
for va1 in range(len(Combinations))[:3]:
    print('    process var '+str(va1)+' of '+str(len(Combinations)))
    XWT_output=dask.delayed(XWT)(DailyVarsTrain[:,:,:,Combinations[va1]],
                   DailyVarsEval[:,:,:,Combinations[va1]],
                   rgrPRrecords,
                   rgrPReval,
                   TimeTrain,
                   TimeEval,
                   rgrNrOfExtremes[ne],
                   SpatialSmoothing[sm],
                   ClusterMeth='hdbscan')
    results.append(XWT_output)

    process var 0 of 6017
    process var 1 of 6017
    process var 2 of 6017
CPU times: user 4.71 s, sys: 2.01 s, total: 6.72 s
Wall time: 8.49 s


In [114]:
%%time
XWT_output = dask.compute(*results)

CPU times: user 9 s, sys: 3.74 s, total: 12.7 s
Wall time: 23.7 s


In [113]:
results = dask.optimize(results)

In [76]:
%%time
for va1 in range(len(Combinations))[:1]:
    print('    process var '+str(va1)+' of '+str(len(Combinations)))
    XWT_output=XWT(DailyVarsTrain[:,:,:,Combinations[va1]],
                   DailyVarsEval[:,:,:,Combinations[va1]],
                   rgrPRrecords,
                   rgrPReval,
                   TimeTrain,
                   TimeEval,
                   rgrNrOfExtremes[ne],
                   SpatialSmoothing[sm],
                   ClusterMeth='hdbscan')

    process var 0 of 6017
CPU times: user 2.48 s, sys: 951 ms, total: 3.43 s
Wall time: 3.5 s


In [6]:
XWT_output

{'grClustersFin': (array([[ 1.2556458 ,  2.47910029,  4.60599936, ...,  1.00021202,
          -1.90511102, -2.36943394]]),
  array([0, 0, 0, 0])),
 'grEucledianDist': array([ 970.94947101,  970.8806865 ,  970.86067471, ..., 1024.7699964 ,
         956.16181874,  969.59714641]),
 'EucledianDistAllWTs': array([[ 970.94947101],
        [ 970.8806865 ],
        [ 970.86067471],
        ...,
        [1024.7699964 ],
        [ 956.16181874],
        [ 969.59714641]]),
 'grCorrelatio': array([-8.39356386e-02, -2.39383891e-02, -1.59223551e-16, ...,
         8.96943211e-02,  6.14423914e-02, -2.73932990e-16]),
 'grCorrelatioAllWTs': array([[-8.39356386e-02],
        [-2.39383891e-02],
        [-1.59223551e-16],
        ...,
        [ 8.96943211e-02],
        [ 6.14423914e-02],
        [-2.73932990e-16]]),
 'grPSS': 0.08328767123287673,
 'grMRD': 0.9925291538238525,
 'grMRR': 1.0863723608445297,
 'APR': 0.0012891610439048637,
 'AUC': 0.45681381957773515,
 'PEX': 14.050944946589972,
 'grExluded': 