# Stats Parallel

Notebook by Cascade Tuholske 2021.02.15 <br>
Trying to speed up 4_Event_Stats.py

In [1]:
#### Dependencies
import pandas as pd
import numpy as np
import xarray as xr
from random import random
from itertools import groupby
from operator import itemgetter
import geopandas as gpd 
import glob
from statistics import mean
import julian
import time 
import multiprocessing as mp 
from multiprocessing import Pool
import os
import sys
import matplotlib.pyplot as plt

In [2]:
# check data
path = '/home/cascade/projects/UrbanHeat/data/processed/PNAS-DATA-v2/'
fns = glob.glob(path+'*')

In [None]:
dfs = []
for fn in fns:
    dfs.append(pd.read_json(fn, orient = 'split'))

In [None]:
fns

In [None]:
for df in dfs: print(len(df))

In [None]:
dfs[0].head()

In [None]:
plt.hist(dfs[0]['duration'], bins = 50);
plt.yscale('log')
plt.title('Duration all events 40.6 >=1 day')
plt.xlabel('days')

In [None]:
tmax = dfs[0]['tmax'].to_list()
tmax_lst = [item for sublist in tmax for item in sublist]

In [None]:
plt.hist(tmax_lst, bins = 50);
plt.yscale('log')
plt.title('HImax >=40.6C, for events >=1 day');
plt.xlabel('HI')

In [None]:
df406 = dfs[0]
df406 = df406[df406['duration'] >= 2]

In [None]:
plt.hist(df406['duration'], bins = 50, color = 'red');
plt.yscale('log')
plt.title('Duration all events 40.6 >=2 day')
plt.xlabel('days')

In [None]:
tmax = df406['tmax'].to_list()
tmax_lst = [item for sublist in tmax for item in sublist]

plt.hist(tmax_lst, bins = 50);
plt.yscale('log')
plt.title('HImax >=40.6C, for events >=2 day');
plt.xlabel('HI')

In [None]:
def add_years(df):
    """ Function adds zero to people days for all missing years for each city 
    so that regressions aren't screwed up"""
    
    years = list(np.unique(df['year'])) # Get list of all years
    row_list = []
    counter = 0
    
    for city in list(np.unique(df['ID_HDC_G0'])):
        city_id = city # Get city Id 
        city_df = df.loc[df['ID_HDC_G0'] == city] # find the location
        city_years = list(np.unique(city_df['year'])) # figure out the number of years
        
        years_dif = list(set(years) - set(city_years)) # find the missing years
        
        #print(len(years_dif))
        if len(years_dif) > 0: # add in the missing years
            
            counter = counter + len(years_dif) # counter
            
            for year in years_dif: # add rows with dummy data and zeros
                row = []
                row.append(city) # city id
                row.append(year) # missing year
                row.append(0) # total days
                row.append('np.nan') # pop year
                row.append(float(df[(df['ID_HDC_G0'] == city)]['P'+str(1983)])) # pop 83
                row.append(float(df[(df['ID_HDC_G0'] == city)]['P'+str(2016)])) # pop 16
                row.append(0) # days
                row.append(0) # pdays 83
                row.append(0) # pdays diff
                
                row_list.append(row)
    
    df_new = pd.DataFrame(row_list, columns= df.columns) # merge the new rows into a df
    
    df_new = df.append(df_new) # add the rows back to the original data frame
    
    # Updated 2020.09.07 CPT - coef can be made for heat when p in 1983 is zero
    df_new = df_new[df_new['P1983'] > 0]
    
    return df_new

In [39]:
np.nan

nan

In [80]:
exp = pd.read_json(path+'HI461_1D_STATS.json', orient = 'split')

In [81]:
exp.head()

Unnamed: 0,ID_HDC_G0,year,total_days,duration,avg_temp,avg_intensity,tot_intensity,event_dates,intensity,tmax,UID
0,11118,1983,4549,1,46.733751,0.633751,0.633751,[1983.02.16],[0.6337506312],[46.7337506312],UID-0
1,11118,1983,4549,1,47.26226,1.16226,1.16226,[1983.02.21],[1.1622601767],[47.2622601767],UID-1
2,11118,1983,4549,3,47.07947,0.97947,2.93841,"[1983.03.02, 1983.03.03, 1983.03.04]","[0.358618612, 0.5029550792, 2.0768362979]","[46.458618612, 46.6029550792, 48.1768362979]",UID-2
3,11118,1983,4549,9,48.647779,2.547779,22.930013,"[1983.03.09, 1983.03.10, 1983.03.11, 1983.03.1...","[1.3213212676000001, 2.3114130899, 4.636047639...","[47.4213212676, 48.4114130899, 50.736047639, 4...",UID-3
4,11118,1983,4549,38,52.665418,6.565418,249.485868,"[1983.03.19, 1983.03.20, 1983.03.21, 1983.03.2...","[1.0477397087, 2.7471206706, 3.9829210885, 4.0...","[47.1477397087, 48.8471206706, 50.0829210885, ...",UID-4


In [14]:
test = exp[:10000]

In [53]:
years = list(np.unique(df['year']))
row_list = []

for city in list(np.unique(df['ID_HDC_G0'])):
    city_id = city # Get city Id 
    city_df = df.loc[df['ID_HDC_G0'] == city] # find the location
    city_years = list(np.unique(city_df['year'])) # figure out the number of years

    years_dif = list(set(years) - set(city_years)) # find the missing years
    
    if len(years_dif) > 0: # add in the missing years
        for year in years_dif: # add rows with dummy data and zeros
            row = []
            row.append(city)
            row.append(year)
            row.append(0) # duration = 0 days
            row.append(np.nan) # population for that year is not needed
            row.append(df[(df['ID_HDC_G0'] == city)]['P1983'].values[0])
            row.append(df[(df['ID_HDC_G0'] == city)]['P1983'].values[0])
            row.append(0) # people_days = 0 days
            row.append(0) # people_days_heat = 0 days
            row.append(0) # people_days_pop = 0 days
            
            row_list.append(row) # append row list
    
df_new = pd.DataFrame(row_list, columns= df.columns) # merge the new rows into a df

df_new = df.append(df_new) # add the rows back to the original data frame

# Drop any city with zero people in 1983
df_new = df_new[df_new['P1983'] > 0]

return df_new

In [54]:
df_new

Unnamed: 0,ID_HDC_G0,year,duration,P,P1983,P2016,people_days,people_days_heat,people_days_pop
0,8716,1983,2,52960.861003,52960.861003,106160.041504,105921.722005,105921.722005,0.000000
1,8716,1984,2,55257.386597,52960.861003,106160.041504,110514.773193,105921.722005,4593.051188
2,8716,1985,3,57553.912191,52960.861003,106160.041504,172661.736572,158882.583008,13779.153564
3,8716,1986,3,59850.437785,52960.861003,106160.041504,179551.313354,158882.583008,20668.730347
4,8716,1987,7,62146.963379,52960.861003,106160.041504,435028.743652,370726.027018,64302.716634
...,...,...,...,...,...,...,...,...,...
25,11060,1993,0,,84561.272239,84561.272239,0.000000,0.000000,0.000000
26,11060,1997,0,,84561.272239,84561.272239,0.000000,0.000000,0.000000
27,11060,2007,0,,84561.272239,84561.272239,0.000000,0.000000,0.000000
28,11060,2008,0,,84561.272239,84561.272239,0.000000,0.000000,0.000000


In [64]:
set(np.unique(df_new[df_new['ID_HDC_G0'] == 12481]['year'])) - set(np.unique(test[test['ID_HDC_G0'] == 12481]['year']))

{1999}

In [61]:
set(np.unique(df_new[df_new['ID_HDC_G0'] == 12481]['year']))

34

In [None]:

""" Function adds zero to people days for all missing years for each city 
so that regressions aren't screwed up"""

years = list(np.unique(exp['year'])) # Get list of all years
row_list = []
counter = 0

for city in list(np.unique(exp['ID_HDC_G0'])):
    city_id = city # Get city Id 
    city_df = exp.loc[exp['ID_HDC_G0'] == city] # find the location
    city_years = list(np.unique(city_df['year'])) # figure out the number of years

    years_dif = list(set(years) - set(city_years)) # find the missing years

    #print(len(years_dif))
    if len(years_dif) > 0: # add in the missing years

        counter = counter + len(years_dif) # counter

        for year in years_dif: # add rows with dummy data and zeros
            row = []
            row.append(city) # city id
            row.append(year) # missing year
            row.append(0) # total days
            print(exp[(exp['ID_HDC_G0'] == city)]) # pop year
#            row.append(float(exp[(exp['ID_HDC_G0'] == city)]['P'+str(year)])) # pop year
#             row.append(float(df[(df['ID_HDC_G0'] == city)]['P'+str(1983)])) # pop 83
#             row.append(float(df[(df['ID_HDC_G0'] == city)]['P'+str(2016)])) # pop 16
#             row.append(0) # days
#             row.append(0) # pdays 83
#             row.append(0) # pdays diff

#             row_list.append(row)

# df_new = pd.DataFrame(row_list, columns= df.columns) # merge the new rows into a df

# df_new = df.append(df_new) # add the rows back to the original data frame

# # Updated 2020.09.07 CPT - coef can be made for heat when p in 1983 is zero
# df_new = df_new[df_new['P1983'] > 0]



In [None]:
out = add_years(exp)

In [69]:
df_in = path+'HI406_2D_STATS.json'

In [70]:
df = pd.read_json(df_in, orient = 'split')

In [71]:
df.head()

Unnamed: 0,ID_HDC_G0,year,duration,avg_temp,avg_intensity,tot_intensity,event_dates,intensity,tmax,UID
1,8716,1983,2,41.968549,1.368549,2.737099,"[1983.04.23, 1983.04.24]","[2.2036853194, 0.5334132134]","[42.8036853194, 41.1334132134]",UID-1
2,8716,1983,16,47.880512,7.280512,116.488197,"[1983.04.26, 1983.04.27, 1983.04.28, 1983.04.2...","[0.7365281444, 4.1637614766, 9.6917897375, 6.4...","[41.3365281444, 44.7637614766, 50.2917897375, ...",UID-2
3,8716,1983,51,51.438063,10.838063,552.741209,"[1983.05.13, 1983.05.14, 1983.05.15, 1983.05.1...","[3.4425009533, 7.2378939909, 10.2395632742, 9....","[44.0425009533, 47.8378939909, 50.8395632742, ...",UID-3
4,8716,1983,22,53.897499,13.297499,292.544988,"[1983.07.04, 1983.07.05, 1983.07.06, 1983.07.0...","[11.9686002231, 16.3046980407, 14.5550595174, ...","[52.5686002231, 56.9046980407, 55.1550595174, ...",UID-4
5,8716,1983,28,50.167647,9.567647,267.894128,"[1983.07.27, 1983.07.28, 1983.07.29, 1983.07.3...","[4.8134424765, 3.4528546549, 9.7354032993, 10....","[45.4134424765, 44.0528546549, 50.3354032993, ...",UID-5


In [78]:
df[(df['ID_HDC_G0'] == 8716) & (df['year'] == 2000)]['duration'].sum()

192

In [None]:
#### Step 1 - Function Loads all Tmax Data as an X-array
def read_data(dir_path, space_dim, time_dim):
    """ Function reads in all Tmax .csv files, joins them by date along the x-axis
    and returns the whole record as a x-array data array
    
    Args:   
        dir_path = path to .csv files 
        time_dim = name for time dim as a str ... use date :-)
        space_dim = col name for GHS-UCDB IDs as an str (ID_HDC_G0)
    """
    fn_list = sorted(glob.glob(dir_path+'*.csv'))
    df_out = pd.DataFrame()
    date_list = []

    # Open all Tmax files and concat into a df
    for i, fn in enumerate(fn_list):    
        # Open the CSV
        df = pd.read_csv(fn)

        # Get the city ids 
        if i == 1:
            df_id = df[space_dim]

        # get only the Tmax columns and concate date list 
        df_temp = df.iloc[:,3:] # get only temp columns
        date_list = date_list+list(df_temp.columns)

        # Drop cities w/ no temp record 
        df_temp_drop = df_temp.dropna()

        # Merge
        df_out = pd.concat([df_out, df_temp_drop], axis=1)
        print(df_out.shape)
    
    # make date into an array
    tmax_arr = df_out.to_numpy()

    # Make data into an xr.DataArray
    tmax_xr_da = xr.DataArray(tmax_arr, coords=[df_id, date_list], 
                             dims=[space_dim, time_dim])
    return tmax_xr_da

In [None]:
#### Step 2 Function finds all the Tmax Events and writes it to a dateframe w/ dates for each city
def tmax_days(xarray, Tthresh):
    """ Function finds all the tmax days in a year and sums total days per year 
    greater than a threshold within a year where Tmax > Tthresh for each city. Returns the total number of days,
    the dates, the tempatures, and the intensity (daily Tmax - Tthresh)
    
    Args: 
        xarray = an xarray object with dims = (space, times)
        Tthresh = int of temp threshold
    """
    
    # empty lists & df
    id_list = []
    date_list = []
    tmax_list = []
    intensity_list = []
    df_out = pd.DataFrame()
    
    # subset xarray
    out = xarray.where(xarray > Tthresh, drop = True)

    # start loop 
    for index, loc in enumerate(out.ID_HDC_G0):
        id_list.append(out.ID_HDC_G0.values[index]) # get IDS
        date_list.append(out.sel(ID_HDC_G0 = loc).dropna(dim = 'date').date.values) # get event dates
        
        # #CPT 2020.02.23 
        # dayTot_list.append(len(out.sel(ID_HDC_G0 = loc).dropna(dim = 'date').date.values)) # get event totals
        
        tmax_list.append(out.sel(ID_HDC_G0 = loc).dropna(dim = 'date').values) # get temp values
        intensity_list.append(out.sel(ID_HDC_G0 = loc).dropna(dim = 'date').values - Tthresh) # get severity

    # write to a data frame
    df_out['ID_HDC_G0'] = id_list
    # df_out['total_days'] = dayTot_list #CPT 2020.02.23
    df_out['dates'] = date_list
    df_out['tmax'] = tmax_list
    df_out['tmax_tntensity'] = intensity_list

    # return df_out
    return df_out


In [None]:
#### Step 3 Function splits the dataset into Tmax events (continuous days >Tmax) for each city
def jul_convert(dates):
    "Function turn days into julian datetime"
    jul_days = pd.to_datetime(dates).to_julian_date()
    
    return jul_days

def event_split(dates, ID_HDC_G0, intensity, tmax): #, total_days): #CPT 2020.02.23
    
    """ Searchs a list of dates and isolates sequential dates as a list, then calculates event stats.
    See comments in code for more details. 
    
    Args:
        dates: pandas.core.index as julian dates
        ID_HDC_G0: city ID as string
        intensity: numpy.ndarray of intensities values
        tmax: numpy.ndarray of intensities values of tmax values
        total_days: total number of tmax days in a year for a given city

    """

    # city id
    city_id = ID_HDC_G0
    # tot_days = total_days #CPT 2020.02.23
    
    # lists to fill
    city_id_list = []
    # tot_days_list = [] #CPT 2020.02.23
    event_dates_list = []
    dur_list = []
    intensity_list = []
    tmax_list = []
    avg_temp_list = []
    avg_int_list = []
    tot_int_list = []
    year_list = []
    
    # data frame out
    df_out = pd.DataFrame()
    
    # turn days into julian days
    jul_days = jul_convert(dates)
    
    # Counters to make sure we write the correct event dates to a list, don't want julian days in output
    counter = 0
    start = 0
    end = 0
    
    # Loop through dur list and isolate seq days, temps, and intensities
    for k, g in groupby(enumerate(jul_days.values), lambda x: x[1]-x[0]):
        
        seq = list(map(itemgetter(1), g)) # isolate seq. days
        dur = len(seq) # duration of each event
        
        counter = counter + dur # add duration to counter
        end = counter # end of current event
        
        event_dates = dates[start:end] # dates of tmax days during each event
        intense = intensity[start:end] # intensity of each day during event
        temp = tmax[start:end] # temp of each day during event
        avg_temp = mean(temp) # avg. temp during event
        avg_int = mean(intense) # avg. intensity during event
        tot_int = np.sum(intense) # total intensity during event 
        
        start = counter # reset start to current end (e.g. counter)
        year = event_dates[0].split('.')[0]
        
        # fill lists
        city_id_list.append(city_id)
        year_list.append(year)
        # tot_days_list.append(tot_days) #CPT 2020.02.23
        dur_list.append(dur)
        event_dates_list.append(event_dates)
        intensity_list.append(intense)
        tmax_list.append(temp)
        avg_temp_list.append(avg_temp)
        avg_int_list.append(avg_int)
        tot_int_list.append(tot_int)

    # write out as a dateframe
    df_out['ID_HDC_G0'] = city_id_list
    df_out['year'] = year_list
    # df_out['total_days'] = tot_days_list #CPT 2020.02.23
    df_out['duration'] = dur_list
    df_out['avg_temp'] = avg_temp_list
    df_out['avg_intensity'] = avg_int_list
    df_out['tot_intensity'] = tot_int_list
    df_out['event_dates'] = event_dates_list
    df_out['duration'] = dur_list
    df_out['intensity'] = intensity_list
    df_out['tmax'] = tmax_list

    return df_out

#### Step 4 function feeds output from function 3 into function 4
def tmax_stats(df_in):
    """ runs event_split functionon a dataframe to produce desired tmax stats

        NOTE - If you add arguments to event_split to make more states,
        be sure to update this function

        args:
            df: input dataframe
    """
    df_out = pd.DataFrame()

    # NOTE - If you add arguments to event_split to make more stats,
    # be sure to update this function

    for index, row in df_in.iterrows():
        dates = row['dates'] # Get event dates
        intensity = row['tmax_tntensity'] # Get intensity for each day
        tmax = row['tmax'] # Get tmax for each day
        ID_HDC_G0 = row['ID_HDC_G0'] # get city id
        # total_days = row['total_days'] # get total number of tmax days -- CPT 2020.02.23

        df = event_split(dates, ID_HDC_G0, intensity, tmax)# , total_days) #CPT 2020.02.23

        df_out = df_out.append(df)

    return df_out

#### Step 5 function threads it all together
def run_stats(dir_path, space_dim, time_dim, Tthresh, fn_out):
    
    """ Function ties all the Tmax Stats functions together and writes final stats for each Tmax 
    event to a .csv file. Returns results as a dataframe if needed
    
    Args:
        dir_path = path to .csv files 
        time_dim = name for time dim as a str ... use date :-)
        space_dim = col name for GHS-UCDB IDs as an str (ID_HDC_G0)
        Tthresh = float of temp threshold
        fn_out = file and path to write final csv
        
    """
    
    # read in data
    step1= read_data(dir_path, space_dim = space_dim, time_dim = time_dim)
    #step1_sub = step1[:,:10] # subset data for testing
    print('Stack x-array made')
    
    # Mask data based on Tmax threshold ... we're using 40.6C
    step2 = tmax_days(step1, Tthresh)
    print('Tmax masked')
    
    
    # Calculate stats
    step3 = tmax_stats(step2)
    print('Stats made')

    # Save file out
    step3.to_json(fn_out, orient = 'split')
    
    return step3

    print('done')

In [None]:
## Arges Needed 
DATA_IN = '/home/cascade/projects/UrbanHeat/data/interim/CHIRTS_DAILY/HI/' # output from avg temp
DATA_OUT = '/home/cascade/projects/UrbanHeat/data/interim/CHIRTS_DAILY/STATS/'
dir_path = DATA_IN 
space_dim = 'ID_HDC_G0'
time_dim = 'date'
Tthresh = 40.6
fn_out = DATA_OUT+'STATS_1DAY406.json'

In [None]:
# read in data
step1 = read_data(dir_path, space_dim = space_dim, time_dim = time_dim)

In [None]:
# Mask data based on Tmax threshold ... we're using 40.6C
step2 = tmax_days(step1, Tthresh)
print('Tmax masked')

In [None]:
step2.head()

In [None]:
# this is how to split the df

len(np.array_split(step2, 3)[2])

In [None]:
fn_list = glob.glob(DATA_OUT+'HI406_tmp/*STAT*')
fn_list[0]

In [None]:
df_list = []
for fn in fn_list:
    df_list.append(pd.read_json(fn, orient = 'split'))

df_out = pd.concat(df_list)
df_out.to_json(fn_out, orient = 'split')

In [None]:
df_out

In [None]:
done = pd.read_json(DATA_OUT+'HI406_STATS.json', orient = 'split')

# Is the data the same?
cpt feb 2021

In [None]:
new = pd.read_json('/home/cascade/projects/UrbanHeat/data/processed/PNAS-DATA-v2/HI406_STATS.json', orient = 'split')

In [None]:
old = pd.read_csv('/home/cascade/projects/UrbanHeat/data/processed/PNAS-DATA-v1/AllDATA-GHS-ERA5-HI406.csv')

In [None]:
new.shape

In [None]:
old.shape

In [None]:
old[(old['ID_HDC_G0'] == 3091) & (old['year'] == 1983)]

In [None]:
new[(new['ID_HDC_G0'] == 3091) & (new['year'] == 1983)]

# Old Code

In [None]:
cpu = 20+2 # - 2
n = int(len(step2)/ cpu)  #chunk row size
list_df = [step2 [i:i+n] for i in range(0,step2.shape[0],n)]
print(len(list_df))

In [None]:
len(step2)

In [None]:
step2

In [None]:
# write them out
for i, df in enumerate(list_df):
    df.to_json(DATA_OUT+'tmp/tmp_'+str(i)+'.json', orient = 'split')

In [None]:
#os.mkdir(DATA_OUT+'temp')

In [None]:
fns_list = glob.glob(DATA_OUT+'tmp/*.json')

In [None]:
fns_list

In [None]:
fn = fns_list[0]
df = pd.read_json(fn, orient = 'split')

In [None]:
step3 = tmax_stats(df)

In [None]:
def max_stats_run(fn):
    
    """ runs max_stats on a fn (.json) and writes on .json)
    Args:
        fn = file name
    """
    
    # open df
    df = pd.read_json(fn, orient = 'split')
    i = fn.split('temp_')[1].split('.json')[0]
    
    # make small for testing 
    df = df.iloc[0:4,:]
    
    # Calculate stats
    step3 = max_stats(df)

    # write file
    fn_out = DATA_OUT+'temp/'+'STAT_'+str(i)+'.json'
    step3.to_json(fn_out, orient = 'split')
    print('done', i)



In [None]:
for fn in fns_list:
    tbd(fn)

In [None]:
test = glob.glob(DATA_OUT+'temp/STAT*')

In [None]:
test

In [None]:
import Event_Stats_Funcs

In [None]:
test = '/home/cascade/projects/UrbanHeat/data/interim/CHIRTS_DAILY/STATS/HI406_temp/HI406_2.json'

In [None]:
data = test.split('_temp/')[1].split('_')[0]
i = test.split(data+'_temp/')[1].split(data+'_')[1]
DATA_OUT = test.split(data+'_temp')[0]

In [None]:
DATA_OUT

In [None]:
glob.glob('/home/cascade/projects/UrbanHeat/data/interim/CHIRTS_DAILY/STATS/HI406_temp/*')

In [None]:
fn = '/home/cascade/projects/UrbanHeat/data/interim/CHIRTS_DAILY/STATS/HI406_temp/HI406_STAT_6.json'
test = pd.read_json(fn, orient = 'split')
test.head()