In [None]:
# uncomment if you need these
# will add to requirements.txt in upcoming PR
# !pip install dask_geopandas
# !pip install pyogrio
import geopandas as gpd
import os
import matplotlib.pyplot as plt
import boto3
import sys
import xarray as xr
import pandas as pd
from functools import wraps
import dask_geopandas
import re
sys.path.append(os.path.expanduser('../../'))
from scripts.utils.write_metadata import (
    append_metadata
)
import warnings
warnings.filterwarnings("ignore")

In [None]:
s3_client = boto3.client('s3')
def list_geospatial_files(path):
    """ Build a list of shapefile URIs contained in S3 folder """
    # initiate empty list for s3 URIs
    all_shapefiles = []
    bucket_name = 'ca-climate-index' 
    # initiate s3 session
    session = boto3.Session()
    # use the session to get the resource
    s3 = session.resource('s3')
    my_bucket = s3.Bucket(bucket_name)
    # iterate through directory
    for obj in my_bucket.objects.filter(
        Prefix=path):
        # build list of shapefile URIs
        if obj.key.endswith('.zip'):
            # preceding the URI with 'zip' lets you read in the file without downloading, unzipping, etc
            s3_uri = f"zip+s3://ca-climate-index/"+obj.key
            all_shapefiles.append(s3_uri)
        elif obj.key.endswith('.shp'):
            s3_uri = "s3://ca-climate-index/"+obj.key
            all_shapefiles.append(s3_uri)
    return all_shapefiles

# @append_metadata
def reproject_large_shapefile(shp_fname, ca_boundaries, varname='', additional_comments='N/A'):
    """Given S3 URI which corresponds to a data shapefile and a shapefile
    with California Census Tracts, (1) reproject the data shapefile to the CRS of the California Census Tracts, 
    (2) clip to California Census Tracts, and (3) send it off to S3. This code differs from the 
    reproject_shapefile() function by utilizing dask-geopandas to manipulate large datasets."""    

    # read in shapefile of interest from S3 
    print(f"Reading in file: {shp_fname}.")
    gdf = dask_geopandas.read_file(shp_fname, npartitions=10)
    print(f"Original CRS of data for {varname}: {gdf.crs}")
    # check the current coordinate system of the census tracts data
    print(f"CRS of Census Tracts Shapefile: {ca_boundaries.crs}")

    # reproject the data to the census tract CRS and clip to California
    gdf_reprojected = gdf.to_crs(ca_boundaries.crs)
    print(f"{varname} reprojected from {gdf.crs} to {gdf_reprojected.crs} with geopandas to_crs() function.")

    # shuffle the geodataframe into spatially coherent partitions
    # ddf = dask_geopandas.from_geopandas(gdf_reprojected, npartitions=8)
    ddf = gdf_reprojected.spatial_shuffle()
    print(f"{varname} geodataframe has been partitioned into spatially coherent chunks via dask-geopandas spatial_shuffle().")

    ca_ddf = dask_geopandas.from_geopandas(ca_boundaries, npartitions=10)
    ca_ddf = ca_ddf.spatial_shuffle()
    print(f"California Census Tracts geodataframe has been partitioned into spatially coherent chunks via dask-geopandas spatial_shuffle().")

    clipped_gdf = ddf.sjoin(ca_ddf, how='inner', predicate='intersects').compute()
    print(f"{varname} clipped to California Census Tract boundaries via dask-geopandas sjoin() using the 'intersection' method.")
    # try to minimize the data by dropping unnecessary columns
    clipped_gdf = clipped_gdf.reset_index()
    to_drop=['USCB_STATEFP', 'USCB_COUNTYFP', 'USCB_TRACTCE', 
       'USCB_NAMELSAD', 'USCB_MTFCC', 'USCB_FUNCSTAT',
       'USCB_ALAND', 'USCB_AWATER', 'USCB_INTPTLAT', 'USCB_INTPTLON',
        'hilbert_distance']

    clipped_gdf = clipped_gdf.drop(columns=to_drop)
    
    print(f"Additional comments: {additional_comments}.") # eg, code rerun, bug fix, etc
    
    # upload it to S3
    s3_client = boto3.client('s3')  
    bucket_name = 'ca-climate-index' 

    if shp_fname.endswith('.zip'):
        shp_fname = shp_fname.replace(
            'zip+',
            '')
    ddf_part = dask_geopandas.from_geopandas(clipped_gdf, npartitions=5)
    
    if varname=="climate_iowa_mesonet_flash_flood_warnings":
        print("The resulting database is too large to save as a single file and will be partitioned into 5 chunks.")
        for i in range(len(list(ddf_part.partitions))):
            df = ddf_part.partitions[i].compute()
            dest_f = shp_fname.replace(shp_fname.split('/')[-1],f"{varname}_{i}.parquet.gzip")
            print(f"Dataframe partition {i} saved to: {dest_f}")
            df.to_parquet(dest_f, compression='gzip')
    else:
        dest_f = shp_fname.replace(shp_fname.split('/')[-1],f"{varname}.parquet.gzip")
        dest_f = re.sub(r'1_pull_data|2a_subset', '2b_reproject', dest_f)
        
        ddf_part.to_parquet(dest_f, compression='gzip')
        print(f"Database saved to: {dest_f}")


In [None]:
# read in the CSV with the data details
ref_file = sys.path[-1]+'/metadata/Full Data Pipeline Notes - 1_ Pull.csv'
df = pd.read_csv(ref_file)

# subset for shapefiles
ref_df = df.fillna('N/A')
# comment out for now as 'Pulled Format' column not updated
# ref_df = ref_df[ref_df["Pulled Format"].str.contains("shp")]

### Define the path
path1 = "1_pull_data"
path2 = "2a_subset"
#  build a list of shapefiles in the above s3 paths
my_list = list_geospatial_files(path1) 
my_list += list_geospatial_files(path2)

# read in CA census tiger file
census_shp_dir = "s3://ca-climate-index/0_map_data/2021_tiger_census_tract/2021_ca_tract/"
ca_boundaries = gpd.read_file(census_shp_dir)
# need to rename columns so we don't have any duplicates in the final geodatabase
column_names = ca_boundaries.columns
new_column_names = ["USCB_"+column for column in column_names if column != "geometry"]
ca_boundaries = ca_boundaries.rename(columns=dict(zip(column_names, new_column_names)))

In [None]:
large_files = [
    'climate_iowa_mesonet_flash_flood_warnings', 
    'climate_koordinates_floodplain', 
    'climate_iowa_mesonet_wildfire_warnings',
    'governance_usda_watershed_risk',
    'governance_usda_fuel_reduction'
]

# get reference dataframe for the above large files
large_df = ref_df[ref_df.Variable.isin(large_files)]
# build list of paths corresponding to the large files
large_list = [f for f in my_list if f.split('/')[-1] in large_df["File Name"].values]
# dictionary to map file names to variable names
fname_dict = dict(zip(large_df["File Name"].values, large_df["Variable"].values))

In [None]:
for fpath in large_list:
    # get the file name by itself (no subdirectories)
    fname = fpath.split('/')[-1]
    varname = fname_dict[fname]
    if (varname=='governance_usda_fuel_reduction'): # skip for now - error with datetime formatting
        continue
    print(varname)
    reproject_large_shapefile(fpath, ca_boundaries, varname=varname)