# Prepare downloads from GBIF for model

In [2]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:50808")
client

0,1
Client  Scheduler: tcp://127.0.0.1:50808  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.59 GB


In [5]:
import pandas as pd
from h3 import h3
import dask.dataframe as dd

In [32]:
# load file from google cloud storage
file_name = 'new-england-birds-2019-2019.csv'
f = f'gs://ebird-eod/{file_name}'

In [33]:
%%time
cols = ['gbifID', 'species', 'eventDate', 'decimalLatitude', 'decimalLongitude', 'stateProvince']

df = dd.read_csv(f, sep='\t', usecols=cols, parse_dates=['eventDate'])

CPU times: user 92.1 ms, sys: 94.3 ms, total: 186 ms
Wall time: 1.14 s


In [34]:
df

Unnamed: 0_level_0,gbifID,species,stateProvince,decimalLatitude,decimalLongitude,eventDate
npartitions=120,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,int64,object,object,float64,float64,datetime64[ns]
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [35]:
regions = {}
regions['mid_atlantic']   = ['New York', 'New Jersey', 'Pennsylvania', 'Maryland', 'Deleware', 'District of Columbia', 'Virginia', 'West Virginia' ]

regions['new_england']    = ['Maine', 'New Hampshire', 'Vermont', 'Massachusetts', 'Connecticut', 'Rhode Island']

regions['southern_coast'] = ['North Carolina', 'South Carolina', 'Georgia', 'Florida']

In [36]:
states = regions['new_england']

mask = df['stateProvince'].isin(states)
df = df[mask]

In [37]:
%%time
APERTURE_SIZE = 6
hex_col = 'hex'+str(APERTURE_SIZE)

# find hexs containing the points
df[hex_col] = df.apply(
    lambda x: h3.geo_to_h3(x.decimalLatitude, x.decimalLongitude, APERTURE_SIZE),1, meta=(None, 'object'))

CPU times: user 8.26 ms, sys: 3.31 ms, total: 11.6 ms
Wall time: 12.1 ms


In [14]:
df

Unnamed: 0_level_0,gbifID,species,stateProvince,decimalLatitude,decimalLongitude,eventDate,hex6
npartitions=120,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,int64,object,object,float64,float64,datetime64[ns],object
,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [15]:
df = df.groupby(['hex6',pd.Grouper(freq='W', key='eventDate')])['gbifID'].count().reset_index()

In [16]:
%%time
# time on gcp: time: 1min 59s
# Time on local computer: 5min 24s

result = df.compute()

CPU times: user 772 ms, sys: 49.5 ms, total: 822 ms
Wall time: 6min 17s


In [17]:
# type is now a dataframe that will fit in local RAM
type(result)

pandas.core.frame.DataFrame

In [38]:
result.shape

(154078, 3)

# apply smoothing

In [19]:
# apply kring smoothing by week

In [20]:
# Modified From
# https://github.com/uber/h3-py-notebooks/blob/master/notebooks/unified_data_layers.ipynb

def kring_smoothing(df, hex_col, metric_col, k):
    dfk = df[[hex_col]] 
    dfk.index = dfk[hex_col]
    dfs =  (dfk[hex_col]
                 .apply(lambda x: pd.Series(list(h3.k_ring(x,k)))).stack()
                 .to_frame('hexk').reset_index(1, drop=True).reset_index()
                 .merge(df[[hex_col,metric_col]]).fillna(0)
                 .groupby(['hexk'])[[metric_col]].sum().divide((1 + 3 * k * (k + 1)))
                 .reset_index()
                 .rename(index=str, columns={"hexk": hex_col}))
    dfs['lat'] = dfs[hex_col].apply(lambda x: h3.h3_to_geo(x)[0])
    dfs['lng'] = dfs[hex_col].apply(lambda x: h3.h3_to_geo(x)[1]) 
    return dfs

In [21]:
result = result.rename(columns={'eventDate':'week', 'gbifID':'cnt'})
result.head()

Unnamed: 0,hex6,week,cnt
0,862a04c9fffffff,2018-08-26,266
1,862a06427ffffff,2018-03-25,58
2,862a06427ffffff,2018-08-19,199
3,862a06437ffffff,2018-08-19,48
4,862a064a7ffffff,2019-07-14,60


In [22]:
%%time
# use apply here?
eows = result.week.unique()
df_lst = []
k = 2

for w in eows:
    mask = result['week'] == w
    
    d = kring_smoothing(result[mask], hex_col, 'cnt', k)
    d['week'] = w
    
    df_lst.append(d)
    

CPU times: user 35.3 s, sys: 320 ms, total: 35.6 s
Wall time: 35.8 s


In [23]:
dfs = pd.concat(df_lst)

In [24]:
dfs['cnt'].sum()

11612607.000000004

In [27]:
# Save processed file

file_path = ['file_path']
f = f'{file_path}{file_name}_processed.csv'
# dfs.to_csv(f)

## Combine CSVs

In [30]:
# Use Dask Instead?
import glob

In [None]:
f = f'{file_path}*.csv'
files = glob.glob(f)

df = pd.concat([pd.read_csv(f) for f in glob.glob(f)], ignore_index = True)

In [39]:
result.head()

Unnamed: 0,hex6,week,cnt
0,862a04c9fffffff,2018-08-26,266
1,862a06427ffffff,2018-03-25,58
2,862a06427ffffff,2018-08-19,199
3,862a06437ffffff,2018-08-19,48
4,862a064a7ffffff,2019-07-14,60


In [40]:
from dask import delayed # to allow parallel computation

In [47]:
%%time
# use apply here?
eows = result.week.unique()
df_lst = []
k = 2

for w in eows:
    mask = result['week'] == w
    
    d = delayed(kring_smoothing)(result[mask], hex_col, 'cnt', k)
#     d['week'] = w
    
    df_lst.append(d.compute())

CPU times: user 730 ms, sys: 65.8 ms, total: 796 ms
Wall time: 45.1 s


In [45]:
dfs = pd.concat(df_lst)

TypeError: cannot concatenate object of type '<class 'dask.delayed.Delayed'>'; only Series and DataFrame objs are valid

In [46]:
%%time
dfs = pd.concat([i.compute() for i in df_lst])

CPU times: user 627 ms, sys: 107 ms, total: 734 ms
Wall time: 42.1 s
