In [None]:
##############################################################################################################
####
####   Pour Point Rst
####   By Cascade Tuholske June 2020
####
##############################################################################################################

In [1]:
#### Dependencies 
##############################################################################################################
from rasterstats import zonal_stats, gen_zonal_stats
import numpy as np
import pandas as pd
import geopandas as gpd
import rasterio
import time
import multiprocessing as mp 
from multiprocessing import Pool
from glob import glob

In [None]:
# effluent rsts
DATA_IN = '/home/cascade/projects/wastewater/data/'
effluent_rsts = [DATA_IN+'interim/effluent_N_mask.tif', DATA_IN+'interim/effluent_N_treated_mask.tif', 
                 DATA_IN+'interim/effluent_N_septic_mask.tif', DATA_IN+'interim/effluent_N_open_mask.tif']

In [3]:
# effluent rsts
DATA_IN = '/home/cascade/projects/wastewater/data/'
effluent_rsts = [DATA_IN+'interim/effluent_N.tif', DATA_IN+'interim/effluent_N_treated.tif', 
                 DATA_IN+'interim/effluent_N_septic.tif', DATA_IN+'interim/effluent_N_open.tif']

# Get one raster
rst = effluent_rsts[0]

# Files and Paths
data = rst.split('interim/')[1].split('.')[0].split('mask64')[0]
DATA_IN = rst.split('interim/')[0]
path_out = 'processed/N_effluent_output/'
fn_out = DATA_IN+path_out+data
print('fn_out', fn_out)

# Open countries    
countries_fn = DATA_IN+'interim/world_vector.shp' 
countries = gpd.read_file(countries_fn)

# Open watershed basins polygons and stack em
basins_dir = glob(DATA_IN+'interim/basins_crs/*59004.shp')

# empty df to stack all the watershed polys
columns= (['ID','GRIDCODE','inspect','area','PNTPOLYCNT','basin_id','MWa_in_km2','geometry'])
watersheds = pd.DataFrame(columns = columns)

# Open watershed polys
for shp_fn in basins_dir:
    basins = pd.DataFrame(gpd.read_file(shp_fn))
    watersheds = watersheds.append(basins, sort = False)

watersheds = gpd.GeoDataFrame(watersheds) # to geo data frame
watersheds.crs = countries.crs # Set crs

# Drop inland watersheds w/ pourpoints file
inland_pp_fn = DATA_IN+'interim/pourpoints_inland.shp'
inland_pp = gpd.read_file(inland_pp_fn)
inland_pp = inland_pp[inland_pp['land-ocean'] == 0] # drop inland watersheds 
inland_pp = inland_pp['basin_id']
watersheds = watersheds.merge(inland_pp, on = 'basin_id', how = 'inner') # merge in




























In [None]:
DATA_IN = '/home/cascade/projects/wastewater/data/'
MASK_FN = DATA_IN+'interim/inlandwatersheds_mask.tif'

In [None]:
def run_zonal(rst_fn, polys, fn_out, geog): 
    """Function will run zonal stats on countries or watersheds. Only use watersheds 
    that have had the had the inland watersheds dropped. 
    
    Args:
        rst_fn = file name/path of raster to run zonal stats on
        geog = countries or watersheds as str for fn_out
        polys = either list of shape files (watersheds) or single shape file (countries)
        fn_out = file and path for shp and csv file
    
    """
    
    # Run Zonal Stats
    zs_feats = zonal_stats(polys, rst_fn, stats="sum count", geojson_out=True)
        
    # Turn into geo data frame and rename column
    zgdf = gpd.GeoDataFrame.from_features(zs_feats, crs=watersheds.crs)
    zgdf = zgdf.rename(columns={'sum': 'effluent'})
    zgdf.effluent = zgdf.effluent.fillna(0)
    
    # Save out shape and CSV
    zgdf.to_file(fn_out+geog+'.shp')
    zgdf.to_csv(fn_out+geog'.csv')

In [None]:
# Open countries
print('Opening countries')
countries_fn = DATA_IN+'interim/world_vector.shp' 
countries = gpd.read_file(countries_fn)
print('countries are: ', len(countries))

# Open watershed basins polygons and stack em
print('Opening watersheds')
basins_dir = glob(DATA_IN+'interim/basins_crs/*59004.shp')

# empty df to stack all the watershed polys
columns= (['ID','GRIDCODE','inspect','area','PNTPOLYCNT','basin_id','MWa_in_km2','geometry'])
watersheds = pd.DataFrame(columns = columns)

# Open watershed polys
for shp_fn in basins_dir:
    basins = pd.DataFrame(gpd.read_file(shp_fn))
    watersheds = watersheds.append(basins, sort = False)

watersheds = gpd.GeoDataFrame(watersheds) # to geo data frame
watersheds.crs = countries.crs # Set crs
print('watersheds are:', len(watersheds), type(watersheds), watersheds.crs)

# Drop inland watersheds w/ pourpoints file
inland_pp_fn = DATA_IN+'interim/pourpoints_inland.shp'
inland_pp = gpd.read_file(inland_pp_fn)
inland_pp = inland_pp[inland_pp['land-ocean'] == 0] # drop inland watersheds 
inland_pp = inland_pp['basin_id']
watersheds = watersheds.merge(inland_pp, on = 'basin_id', how = 'inner') # merge in
print('watersheds are:', len(watersheds), type(watersheds), watersheds.crs)

# Mask64 rasters
effluent_rsts = glob(DATA_IN+'interim/*mask64.tif')

In [None]:
# Mask64 rasters
effluent_rsts = glob(DATA_IN+'interim/*mask64.tif')

In [None]:
def zonal_par(rst):
    """UPDATE"""
    
    # print current process
    print(mp.current_process())
    
    # Files and Paths
    data = rst.split('interim/')[1].split('.')[0].split('mask64')[0]
    DATA_IN = rst.split('interim/')[0]
    path_out = 'processed/N_effluent_output/'
    geog = 'watersheds'
    fn_out = DATA_IN+path_out+data+geog
    print('fn_out', fn_out)
    
    # Open countries    
    countries_fn = DATA_IN+'interim/world_vector.shp' 
    countries = gpd.read_file(countries_fn)

    # Open watershed basins polygons and stack em
    basins_dir = glob(DATA_IN+'interim/basins_crs/*59004.shp')

    # empty df to stack all the watershed polys
    columns= (['ID','GRIDCODE','inspect','area','PNTPOLYCNT','basin_id','MWa_in_km2','geometry'])
    watersheds = pd.DataFrame(columns = columns)

    # Open watershed polys
    for shp_fn in basins_dir:
        basins = pd.DataFrame(gpd.read_file(shp_fn))
        watersheds = watersheds.append(basins, sort = False)

    watersheds = gpd.GeoDataFrame(watersheds) # to geo data frame
    watersheds.crs = countries.crs # Set crs

    # Drop inland watersheds w/ pourpoints file
    inland_pp_fn = DATA_IN+'interim/pourpoints_inland.shp'
    inland_pp = gpd.read_file(inland_pp_fn)
    inland_pp = inland_pp[inland_pp['land-ocean'] == 0] # drop inland watersheds 
    inland_pp = inland_pp['basin_id']
    watersheds = watersheds.merge(inland_pp, on = 'basin_id', how = 'inner') # merge in
    
    # country zonal stats
    # print('Starting country', rst)
    #run_zonal(rst, polys = countries, fn_out = fn_out)
    
    # watershed zonal stats
    print('Starting watershed', rst)
    run_zonal(rst, polys = watersheds, fn_out = fn_out)
    
#     print(data, 'DONE \n')
    

In [None]:
# start pools
def parallel_loop(function, job_list, cpu_num):
    """Run the temp-ghs routine in parallel
    Args: 
        function = function to apply in parallel
        job_list = list of dir or fn to loop through 
        cpu_num = numper of cpus to fire  
    """ 
    start = time.time()
    pool = Pool(processes = cpu_num)
    pool.map(function, job_list)
    # pool.map_async(function, dir_list)
    pool.close()

    end = time.time()
    print(end-start)

In [None]:
# Run it
parallel_loop(zonal_par, effluent_rsts, 4)

In [None]:
# Need to stack and merge it all together. 
# FIX geog in zonal stats