In [None]:
from google.cloud import storage
import gcsfs
import os
import shutil
import rasterio
import numpy as np
import geopandas as gpd
import pandas as pd
from rasterio.mask import mask
import time
import sys
sys.path.insert(0, "../src")
from PRISM_Preprocess import *

# Extract PRISM Data by County

In [None]:
# Initialize GCS client
client = storage.Client()
gcs = gcsfs.GCSFileSystem()

# Define GCS bucket and folder paths
bucket_name = "leap-persistent"
source_folder = "adamnayak/flood-insurance/PRISM/Monthly_Annual"
destination_folder = "adamnayak/flood-insurance/PRISM/County_Monthly_Precip_By_Year_Mon"

# Define local folders
local_prism_folder = "PRISM_Data"
output_folder = "PRISM_Monthly_Precip"
county_shapefile = "../Local_Data/Geospatial/tl_2019_us_county.shp"

# Ensure local folders exist
os.makedirs(local_prism_folder, exist_ok=True)
os.makedirs(output_folder, exist_ok=True)

In [None]:
# Step 1: Download PRISM files from GCS to local `prism_folder`
print("Downloading PRISM files...")
bucket = client.bucket(bucket_name)
blobs = client.list_blobs(bucket_name, prefix=source_folder)

for blob in blobs:
    if not blob.name.endswith("/"):  # Skip directories
        destination_path = os.path.join(local_prism_folder, os.path.basename(blob.name))
        blob.download_to_filename(destination_path)
        print(f"Downloaded: {blob.name} to {destination_path}")

# Load the county shapefile
counties = gpd.read_file(county_shapefile)

In [None]:
# Step 2: Process each PRISM file
for year in range(1923, 2024):
    for month in range(1, 13):
        yearmo = f"{year}{str(month).zfill(2)}"
        
        # Determine file naming pattern based on year
        file_suffix = "M2" if year < 1980 else "M3"
        
        prism_file = os.path.join(local_prism_folder, f"PRISM_ppt_stable_4km{file_suffix}_{yearmo}_bil.bil")
        
        # Skip if the file does not exist
        if not os.path.exists(prism_file):
            print(f"File not found: {prism_file}")
            continue
        
        with rasterio.open(prism_file) as src:
            # Ensure that the precipitation data has a CRS
            if not ds.rio.crs:
                # If the file lacks metadata, assume WGS84
                ds = ds.rio.write_crs("EPSG:4326", inplace=True)
                print("Assigned EPSG:4326 to precipitation data:", ds.rio.crs)

            county_precip = []
            
            for _, county in counties.iterrows():
                geom = [county["geometry"].__geo_interface__]
                
                # Buffer the geometry slightly to deal with precision issues
                buffered_geom = [county["geometry"].buffer(0.001).__geo_interface__]
                
                avg_precip = calculate_weighted_average(src, buffered_geom)
                
                county_precip.append({
                    "county": county["GEOID"],
                    "precipitation": avg_precip
                })
        
        # Create a DataFrame and save to CSV
        df = pd.DataFrame(county_precip)
        output_file = os.path.join(output_folder, f"{yearmo}_PRISM_monthly_precip.csv")
        df.to_csv(output_file, index=False)
        print(f"Saved: {output_file}")

In [None]:
# Step 3: Concatenate and save all files
# Define the folder containing the CSV files
folder_path = 'PRISM_Monthly_Precip'

# Initialize an empty list to store the DataFrames
all_data = []

# Iterate over each file in the folder
for file_name in os.listdir(folder_path):
    if file_name.endswith('.csv'):  # Ensure we only read CSV files
        # Extract the year and month from the filename
        yearmo = file_name.split('_')[0]  # Format: {yearmo}_PRISM_monthly_precip.csv
        year = yearmo[:4]  # First 4 characters are the year
        month = yearmo[4:6]  # Next 2 characters are the month
        
        # Load the CSV into a DataFrame
        file_path = os.path.join(folder_path, file_name)
        df = pd.read_csv(file_path)
        
        # Add 'year' and 'month' columns
        df['year'] = year
        df['month'] = month
        
        # Append the DataFrame to the list
        all_data.append(df)

# Concatenate all the DataFrames in the list
final_df = pd.concat(all_data, ignore_index=True)
final_df.rename(columns={'precipitation': 'PRISM_precipitation'}, inplace=True)

In [None]:
# Step 4: Find the annual maximum precipitation for each county
annual_max_df = final_df.groupby(['county', 'year'])['PRISM_precipitation'].max().reset_index()
annual_max_df = annual_max_df.rename(columns={'PRISM_precipitation': 'PRISM_annual_max_precipitation'})

# Step 5: Merge the annual maximum values back into the original PRISM_df
final_df = pd.merge(final_df, annual_max_df, on=['county', 'year'], how='left')

In [None]:
# Step 6: Create a dictionary that stores annual max distributions for each county
county_max_dist = final_df.groupby('county')['PRISM_annual_max_precipitation'].apply(list).to_dict()

In [None]:
# Step 7: Apply the vectorized function to calculate percentiles for the entire DataFrame
start_time = time.time()
row_count = len(final_df)
batch_size = 10000

# List to hold the results
percentile_results = []

for i in range(0, row_count, batch_size):
    batch_df = final_df.iloc[i:i + batch_size]  # Process rows in batches of 10,000
    # Calculate the percentiles for the batch
    batch_percentiles = batch_df.apply(
        lambda row: calculate_percentile_vectorized(row['PRISM_precipitation'], row['county'], county_max_dist),
        axis=1
    )
    # Append results to the list
    percentile_results.extend(batch_percentiles)
    
    # Print progress every 10,000 rows
    elapsed_time = time.time() - start_time
    print(f"Processed {i + batch_size} rows out of {row_count} (Elapsed time: {elapsed_time:.2f} seconds)")

# Step 8: Assign the results back to the DataFrame
final_df['PRISM_percentile'] = percentile_results
final_df['PRISM_percentile'] = final_df['PRISM_percentile']*100

In [None]:
# Export the final DataFrame to a CSV file
output_file_path = 'PRISM_Monthly_Precip_Processed_County.csv'
final_df.to_csv(output_file_path, index=False)

print(f"Data has been successfully concatenated and saved to {output_file_path}.")

# Merge with Claims

In [None]:
# Ensure the 'year' and 'month' columns in PRISM_df are strings as well
final_df['year'] = final_df['year'].astype(str)
final_df['month'] = final_df['month'].astype(str).str.zfill(2)
final_df['county'] = final_df['county'].astype(str).str.zfill(5)

In [None]:
# Load the claims data and drop NAs
claims_df = pd.read_csv('MSWEP_ERA5_Processed_Claims.csv')
claims_df = claims_df.dropna(subset=['countyCode', 'dateOfLoss'])

# Ensure dateOfLoss is in datetime format
claims_df['dateOfLoss'] = pd.to_datetime(claims_df['dateOfLoss'], errors='coerce')

# Extract year and month from 'dateOfLoss'
claims_df['year'] = claims_df['dateOfLoss'].dt.year.astype(str)  # Convert to string
claims_df['month'] = claims_df['dateOfLoss'].dt.month.astype(str).str.zfill(2)  # Convert to string and pad month with leading zeros

# Convert 'countyCode' in the claims data to string and ensure it has leading zeros to match PRISM_df
claims_df['countyCode'] = claims_df['countyCode'].astype(int).astype(str)
claims_df['countyCode'] = claims_df['countyCode'].apply(lambda x: str(x).zfill(5))

In [None]:
# Merge the claims data with PRISM_df based on 'year', 'month', and 'countyCode' from claims, and 'county' from PRISM_df
merged_df = pd.merge(
    claims_df,
    final_df[['year', 'month', 'county', 'PRISM_precipitation', 'PRISM_percentile']],
    how='left',
    left_on=['year', 'month', 'countyCode'],
    right_on=['year', 'month', 'county']
).drop(columns=['year', 'month', 'county'])

In [None]:
merged_df.rename(columns={'PRISM_precipitation': 'PRISM_mon_precipitation',
                         'PRISM_percentile': 'PRISM_mon_percentile'}, inplace=True)

In [None]:
merged_df.to_csv('PRISM_MSWEP_ERA5_Processed_Claims.csv', index=False)

In [None]:
# Delete the local `prism_folder` after processing
print(f"Deleting local folder: {local_prism_folder}")
shutil.rmtree(local_prism_folder)

# Upload processed data to GCS
print("Uploading processed data to GCS...")
output_blobs = os.listdir(output_folder)
for file_name in output_blobs:
    local_path = os.path.join(output_folder, file_name)
    blob_name = os.path.join(destination_folder, file_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(local_path)
    print(f"Uploaded: {file_name} to {blob_name}")