In [1]:
from multiprocessing import Pool
from shapely.geometry import Point
import pandas as pd
import numpy as np
import geopandas as gpd
import xarray as xr

In [2]:
data_years = [
    '2015',
    '2014',
    '2013',
    '2012',
    '2011',
    '2010',
    '2009',
    '2008',
    '2007',
    '2006',
    '2005'
]

data_base_path = '../data/NOAA_weather_data/'
original_datafile_subdir = 'original_datafiles/'
NOAA_NARR_file_ext = 'nc'

US_states_shapefile = '../data/spatial_data/cb_2018_us_state_500k.shp'

data_types = [
    'air.sfc',  # Surface air temp
    'air.2m',   # Air temp. at 2 meters above surface
    'apcp',     # Accumulated precipitation
    'crain',    # Catagorical rain at surface
    'rhum.2m',  # Relative humidity 2 meters above surface
    'dpt.2m',   # Dew point temp. 2 meters above surface
    'pres.sfc', # Pressure at surface
    'uwnd.10m', # u component of wind (positive = from west) 10 meters above surface
    'vwnd.10m', # v component of wind (positive = from south) 10 meters above surface
    'veg',      # Vegitation at surface 
]

# California bounding box coordinates
LAT_START = 31.52
LON_START = -125.48
LAT_END = 43.0
LON_END = -113.131

# Parallelization options
N_PROCESSES = 15
JOBS_PER_PROCESS = 1

In [3]:
def netCDF_to_df(filename):
    '''Takes name of netCDF file, uses xarray to read
    file into an xarray data set, then converts to
    pandas dataframe and returns.'''
    ds = xr.open_dataset(filename)
    df = ds.to_dataframe()
    return(df)

def clean_NOAA_NARR_df(df, data_type):
    '''Takes raw NOAA NARR weather data frame, removes
    unnecessary colums & index levels. Leaves lat, lon
    and value. Renames value column after data_type
    Leaves index as datetime of observation.'''
    df.index = df.index.droplevel([1, 2])  
    df.rename(columns={df.columns[3]:data_type}, inplace=True)
    df.drop(['Lambert_Conformal'], axis=1, inplace=True) 
    df.dropna(axis=0, inplace=True)
    return(df)

def spatial_filter_coarse(df):
    '''Takes dataframe containing lat, lon columns. Returns
    only rows which fall inside California bounding box
    coordinates.'''
    df = df.loc[(df['lat'] >= LAT_START) & (df['lat'] <= LAT_END)]
    df = df.loc[(df['lon'] >= LON_START) & (df['lon'] <= LON_END)]
    return(df)

def load_california_polygon(shapefile):
    '''Loads US Census Bureau state boarders shapefile,
    returns main landmass of California as a shaply
    polygon object'''
    gdf = gpd.read_file(shapefile)
    california = gdf[gdf['NAME'] == 'California']
    multipoly = california.loc[16, 'geometry']
    california = multipoly[-1]
    return(california)

def is_california_point(point):
    '''Takes point and returns point if in california'''
    coord = Point(point['lon'], point['lat'])
    if coord.within(california) == True:
        return point
    else:
        return empty
    
def keep_california_points(df):
    '''Takes a dataframe containing and uses apply to
    run a function on it. Called by parallelize.'''
    keepers = df.apply(is_california_point, axis=1)
    return keepers

In [4]:
def parse_NOAA_NARR_weather_data(data_year):
    for data_type in data_types:
        input_file = f'{data_base_path}{original_datafile_subdir}{data_type}.{data_year}.{NOAA_NARR_file_ext}'
        output_file = f'{data_base_path}{data_year}_california_box_{data_type}.csv'

        df = netCDF_to_df(input_file)
        df = clean_NOAA_NARR_df(df, data_type)
        df = spatial_filter_coarse(df)

        df.to_csv(output_file, header = True)
    
def parallelize(function, df, n_processes, jobs_per_process):
    '''Parallelizes a function. Takes function name, dataframe
    and number of threads. Splits up function call over
    avalible threads. Joins and returns the results.'''  
    df_split = np.array_split(df, (n_processes * jobs_per_process))
    with Pool(n_processes) as pool:
        result = pd.concat(pool.imap(function, df_split))
                                           
    pool.close()
    pool.join()
    return result

In [5]:
# Three processes is the max we can handle without running out of memory (47G avalible)

with Pool(3) as pool:
    pool.map(parse_NOAA_NARR_weather_data, data_years)

pool.close()
pool.join()

  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,


  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,


  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,


  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,
  use_cftime=use_cftime,


In [None]:
california = load_california_polygon(US_states_shapefile)

# empty series to be returned in place of a point
# which is outside of californa
empty = pd.Series([np.nan, np.nan])
empty.index = ['lon', 'lat']

for data_year in data_years:
    for data_type in data_types:
        input_file = f'{data_base_path}{data_year}_california_box_{data_type}.csv'
        output_file = f'{data_base_path}{data_year}_california_only_{data_type}.csv'

        df = pd.read_csv(input_file)
        %time df = parallelize(keep_california_points, df, N_PROCESSES, JOBS_PER_PROCESS).dropna()

        df.to_csv(output_file, header=True, index=False)

CPU times: user 1.45 s, sys: 522 ms, total: 1.97 s
Wall time: 9min 57s
CPU times: user 1.52 s, sys: 505 ms, total: 2.03 s
Wall time: 9min 58s
CPU times: user 1.46 s, sys: 644 ms, total: 2.11 s
Wall time: 10min 2s
CPU times: user 1.45 s, sys: 730 ms, total: 2.18 s
Wall time: 9min 58s
CPU times: user 1.48 s, sys: 741 ms, total: 2.22 s
Wall time: 9min 54s
CPU times: user 1.51 s, sys: 621 ms, total: 2.13 s
Wall time: 9min 55s
CPU times: user 1.48 s, sys: 687 ms, total: 2.17 s
Wall time: 9min 54s
CPU times: user 1.58 s, sys: 724 ms, total: 2.3 s
Wall time: 9min 53s
CPU times: user 1.53 s, sys: 894 ms, total: 2.42 s
Wall time: 9min 56s
CPU times: user 1.05 s, sys: 740 ms, total: 1.79 s
Wall time: 7min 3s
CPU times: user 1.46 s, sys: 765 ms, total: 2.22 s
Wall time: 9min 54s
CPU times: user 1.48 s, sys: 799 ms, total: 2.28 s
Wall time: 9min 55s
CPU times: user 1.52 s, sys: 870 ms, total: 2.39 s
Wall time: 9min 59s
CPU times: user 1.45 s, sys: 889 ms, total: 2.34 s
Wall time: 9min 58s
CPU time