# Pre-processing California Allocation data for WaDE upload.
Purpose: To pre-process the California data into one master file for simple DataFrame creation and extraction

In [None]:
#Needed Libararies

# working with data
import os
import numpy as np
import pandas as pd
import geopandas as gpd

# visulizaiton
import matplotlib.pyplot as plt
import seaborn as sns

# API retrieval
import requests
import json

# Cleanup
import re
from datetime import datetime
pd.set_option('display.max_columns', 999)  # How to display all columns of a Pandas DataFrame in Jupyter Notebook
pd.set_option('display.float_format', lambda x: '%.5f' % x) # suppress scientific notation in Pandas

In [None]:
# Working Directory
workingDir = "G:/Shared drives/WaDE Data/California/WaterAllocation/RawInputData"
os.chdir(workingDir)

## Import Input Data

In [None]:
# POD Data
fileInput = "current_CAwr Points of Diversion LIST (Detail Summary List)/ewrims_flat_file_pod.zip"
dfinPOD = pd.read_csv(fileInput, compression='zip').replace(np.nan, "").replace('nan,nan', "")

# WaDE UUID tracker for data assessment
if 'WaDEUUID' not in dfinPOD:
    dfinPOD['WaDEUUID'] = "caD" + dfinPOD.index.astype(str)
    dfinPOD.to_csv('current_CAwr Points of Diversion LIST (Detail Summary List)/ewrims_flat_file_pod.zip', compression=dict(method='zip', archive_name='ewrims_flat_file_pod.csv'), index=False)

print(len(dfinPOD))
dfinPOD.head(1)

In [None]:
# Clean data a little
dfinPOD = dfinPOD.fillna("") # remove nan values
dfinPOD['USE_DIRECT_DIVERSION_RATE'] = pd.to_numeric(dfinPOD['USE_DIRECT_DIVERSION_RATE'], errors='coerce').fillna(0) # make sure this is numeric.
print(len(dfinPOD))
dfinPOD.head(1)

In [None]:
# Create VariableSpecificCv value
def createVariableSpecificCv(unit):
    outString = ""
    if unit == "Cubic Feet per Second":
        outString = "CAwr_V1"
    if unit == "Gallons per Day":
        outString = "CAwr_V1"
    if unit == "Acre-feet per Year":
        outString = "CAwr_V1"
    if unit == "Gallons per Minute":
        outString = "CAwr_V1"
    if unit == 'Acre-feet':
        outString = "CAwr_V2"
    if unit == 'Gallons':
        outString = "CAwr_V2"
    else:
        outString = "CAwr_V1"

    return(outString)

dfinPOD['in_VariableSpecificUUID'] = dfinPOD.apply(lambda row: createVariableSpecificCv(row['USE_DIRECT_DIV_RATE_UNITS']), axis=1)
dfinPOD['in_VariableSpecificUUID'].unique()

In [None]:
# convert all flow values to CFS
def convertFlowFunc(val, unit):
    CFS_Value = None
    if unit == "Cubic Feet per Second":
        CFS_Value = val
    if unit == "Gallons per Day":
        CFS_Value = val / (646316.883)
    if unit == "Acre-feet per Year":
        CFS_Value = val / (723.968)
    if unit == "Gallons per Minute":
        CFS_Value = val / (448.83117)
    return(CFS_Value)

dfinPOD['CFS_Value'] = dfinPOD.apply(lambda row: convertFlowFunc(row['USE_DIRECT_DIVERSION_RATE'], row['USE_DIRECT_DIV_RATE_UNITS']), axis=1)
dfinPOD['CFS_Value'].unique()

In [None]:
# convert all volume values to AF
def convertVolumeFunc(val, unit):
    AF_Value = None
    if unit == 'Acre-feet':
        AF_Value = val
    if unit == 'Gallons':
        AF_Value = val / (325850.943)
    return(AF_Value)

dfinPOD['AF_Value'] = dfinPOD.apply(lambda row: convertVolumeFunc(row['USE_DIRECT_DIVERSION_RATE'], row['USE_DIRECT_DIV_RATE_UNITS']), axis=1)
dfinPOD['AF_Value'].unique()

In [None]:
# remove speicial characters from SUB_TYPE
def cleanupSubTypeFunc(val):
    val = str(val).strip()
    val = val.rstrip('_') # remove trailing "_"
    val = val.rstrip(',') # remove trailing "_" # remove trailing commas
    if val == "" or val == "nan" or pd.isnull(val):
        outString = ""
    else:
        val = val.replace("," ," ")
        outString = val
    return outString

dfinPOD['wade_SUB_TYPE'] = dfinPOD.apply(lambda row: cleanupSubTypeFunc(row['SUB_TYPE']), axis=1)
dfinPOD['wade_SUB_TYPE'].unique()

## POD Sites Data

In [None]:
# create output POD dataframe
df = pd.DataFrame()

# Data Assessment UUID
df['WaDEUUID'] = dfinPOD['WaDEUUID']

# Method Info
df['in_MethodUUID'] = "CAwr_M1"

# Variable Info
df['in_VariableSpecificUUID'] =  dfinPOD['in_VariableSpecificUUID']

# Organization Info
df['in_OrganizationUUID'] = "CAwr_O1"

# WaterSource Info
df['in_Geometry'] = ""
df['in_GNISFeatureNameCV'] = ""
df['in_WaterQualityIndicatorCV'] = ""
df['in_WaterSourceName'] = dfinPOD['SOURCE_NAME'].str.title()
df['in_WaterSourceNativeID'] = "" # create customID for temp solution
df['in_WaterSourceTypeCV'] = dfinPOD['SOURCE_TYPE'].str.title()

# Site Info
df['in_RegulatoryOverlayUUIDs'] = ""
df['in_WaterSourceUUID'] = "" # ???
df['in_CoordinateAccuracy'] = ""
df['in_CoordinateMethodCV'] = dfinPOD['LOCATION_METHOD']
df['in_County'] = dfinPOD['COUNTY'].str.title()
df['in_EPSGCodeCV'] = 4326
df['in_Geometry'] = ""
df['in_GNISCodeCV'] = ""
df['in_HUC12'] = dfinPOD['HUC_12_NUMBER']
df['in_HUC8'] = dfinPOD['HUC_8_NUMBER']
df['in_Latitude'] = dfinPOD['LATITUDE']
df['in_Longitude'] = dfinPOD['LONGITUDE']
df['in_NHDNetworkStatusCV'] = ""
df['in_NHDProductCV'] = ""
df['in_PODorPOUSite'] = "POD"
df['in_SiteName'] = dfinPOD['POD_NAME'].str.title()
df['in_SiteNativeID'] = dfinPOD['POD_ID'].replace("", 0).fillna(0).astype(int).astype(str)
df['in_SitePoint'] = ""
df['in_SiteTypeCV'] = dfinPOD['POD_TYPE'].astype(str).str.title()
df['in_StateCV'] = "CA"
df['in_USGSSiteID'] = ""

# AllocationAmount Info
df['in_AllocationApplicationDate'] = dfinPOD['APPLICATION_RECD_DATE']
df['in_AllocationAssociatedConsumptiveUseSiteIDs'] = ""
df['in_AllocationAssociatedWithdrawalSiteIDs'] = ""
df['in_AllocationBasisCV'] = ""
df['in_AllocationChangeApplicationIndicator'] = ""
df['in_AllocationCommunityWaterSupplySystem'] = ""
df['in_AllocationCropDutyAmount'] = ""
df['in_AllocationExpirationDate'] = ""
df['in_AllocationFlow_CFS'] = dfinPOD['CFS_Value'].astype(float) # see above for conversion
df['in_AllocationLegalStatusCV'] = dfinPOD['WATER_RIGHT_STATUS'].str.title()
df['in_AllocationNativeID'] =  dfinPOD['APPLICATION_NUMBER'].replace("", 0).fillna(0).astype(str)
df['in_AllocationOwner'] = dfinPOD['PRIMARY_OWNER_NAME']
df['in_AllocationPriorityDate'] = dfinPOD['PRIORITY_DATE']
df['in_AllocationSDWISIdentifierCV'] = ""
df['in_AllocationTimeframeEnd'] = dfinPOD['DIRECT_DIV_SEASON_END']
df['in_AllocationTimeframeStart'] = dfinPOD['DIRECT_DIV_SEASON_START']
df['in_AllocationTypeCV'] = dfinPOD['WATER_RIGHT_TYPE'].astype(str) +"_" + dfinPOD['wade_SUB_TYPE'].astype(str)
df['in_AllocationVolume_AF'] = dfinPOD['AF_Value'].astype(float) # see above for conversion
df['in_BeneficialUseCategory'] = dfinPOD['USE_CODE'].str.title()
df['in_CommunityWaterSupplySystem'] = ""
df['in_CropTypeCV'] = ""
df['in_CustomerTypeCV'] = ""
df['in_DataPublicationDate'] = ""
df['in_DataPublicationDOI'] = ""
df['in_ExemptOfVolumeFlowPriority'] = 0
df['in_GeneratedPowerCapacityMW'] = ""
df['in_IrrigatedAcreage'] = "" # temp fix, leave blank for now
df['in_IrrigationMethodCV'] = ""
df['in_LegacyAllocationIDs'] = ""
df['in_OwnerClassificationCV'] = ""
df['in_PopulationServed'] = "" # temp fix, leave blank for now
df['in_PowerType'] = ""
df['in_PrimaryBeneficialUseCategory'] = ""
df['in_SDWISIdentifierCV'] = ""
df['in_WaterAllocationNativeURL'] = "https://ciwqs.waterboards.ca.gov/ciwqs/ewrims/EWServlet?Redirect_Page=EWPublicAppSummary.jsp&Purpose=getEwrimsPublicSummary&wrWaterRightID=" + dfinPOD['WR_WATER_RIGHT_ID'].replace("", 0).fillna(0).astype(int).astype(str)


print(len(df))
df.head()

In [None]:
# Concatenate dataframes
frames = [df] # add dataframes here
outdf = pd.concat(frames)
outdf = outdf.drop_duplicates().reset_index(drop=True).replace(np.nan, "")
print(len(outdf))

## Clean output dataframes

In [None]:
# Temp fix, remove comma separated values in in_SiteTypeCV

def cleanDataFunc(Val):
    if "," in Val:
        Val = "WaDE Blank"
    else:
        Val = Val
    return Val

In [None]:
outdf['in_SiteTypeCV'] = outdf.apply(lambda row: cleanDataFunc(row['in_SiteTypeCV']), axis=1)
outdf['in_SiteTypeCV'].unique()

In [None]:
# Clean owner name up
def removeSpecialCharFunc(Val):
    Val = re.sub("[$@&.;,/\)(-]", "", Val).title().strip()
    return Val

In [None]:
outdf['in_AllocationOwner'] = outdf.apply(lambda row: removeSpecialCharFunc(row['in_AllocationOwner']), axis=1)
outdf['in_AllocationOwner'].unique()

In [None]:
outdf['in_WaterSourceName'] = outdf.apply(lambda row: removeSpecialCharFunc(row['in_WaterSourceName']), axis=1)
outdf['in_WaterSourceName'].unique()

In [None]:
outdf['in_SiteName'] = outdf.apply(lambda row: removeSpecialCharFunc(row['in_SiteName']), axis=1)
outdf['in_SiteName'].unique()

In [None]:
# Update datatype of Priority Date to fit WaDE 2.0 structure
def formatDateString(inString1):
    inString = str(inString1).strip()
    try:
        if inString == "" or pd.isnull(inString):
            valndf = ""
        else:
            valD = pd.to_datetime(inString)
            valnDd = valD.date()
            valndf = valnDd.strftime('%m/%d/%Y')
    except:
        valndf = ""
    return valndf

outdf['in_AllocationPriorityDate'] = outdf.apply(lambda row: formatDateString(row['in_AllocationPriorityDate']), axis=1)
outdf['in_AllocationPriorityDate'].unique()

In [None]:
# ExemptOfVolumeFlowPriority for in_AllocationTypeCV records
# check for keywrods 'RIPERIAN', 'PRE1914', or 'Statement of Div and Use' in AllocationTypeCV
def assignExemptOfVolumeFlowPriority(valString):
    valString = str(valString).strip()
    testList = ['RIPERIAN', 'PRE1914', 'Statement of Div and Use']
    checkStr = any(ele in valString for ele in testList)
    if checkStr == True:
        outString = "1"
    else:
        outString = "0"
    return outString

outdf['in_ExemptOfVolumeFlowPriority'] = outdf.apply(lambda row: assignExemptOfVolumeFlowPriority(row['in_AllocationTypeCV']), axis=1)
print(outdf['in_ExemptOfVolumeFlowPriority'].value_counts())
outdf['in_ExemptOfVolumeFlowPriority'].unique()

In [None]:
# ExemptOfVolumeFlowPriority for in_AllocationPriorityDate records
# build off previous in_AllocationTypeCV check

# check if in_AllocationPriorityDate, make excempt if true.  Temp fix.
def assignExemptOfVolumeFlowPriorityPD(valEVFP, valPD):
    valPD = str(valPD).strip()
    if (valPD == '') or (pd.isnull(valPD)):
        outString = "1"
    else:
        outString = valEVFP
    return outString

outdf['in_ExemptOfVolumeFlowPriority'] = outdf.apply(lambda row: assignExemptOfVolumeFlowPriorityPD(row['in_ExemptOfVolumeFlowPriority'], row['in_AllocationPriorityDate']), axis=1)
print(outdf['in_ExemptOfVolumeFlowPriority'].value_counts())
outdf['in_ExemptOfVolumeFlowPriority'].unique()

In [None]:
# Ensure Empty String

def ensureEmptyString(val):
    val = str(val).strip()
    if val == "" or val == " " or val == "nan" or pd.isnull(val):
        outString = ""
    else:
        outString = val
    return outString

In [None]:
outdf['in_WaterSourceName'] = outdf.apply(lambda row: ensureEmptyString(row['in_WaterSourceName']), axis=1)
outdf['in_WaterSourceName'].unique()

In [None]:
outdf['in_WaterSourceTypeCV'] = outdf.apply(lambda row: ensureEmptyString(row['in_WaterSourceTypeCV']), axis=1)
outdf['in_WaterSourceTypeCV'].unique()

In [None]:
outdf['in_CoordinateAccuracy'] = outdf.apply(lambda row: ensureEmptyString(row['in_CoordinateAccuracy']), axis=1)
outdf['in_CoordinateAccuracy'].unique()

In [None]:
outdf['in_CoordinateMethodCV'] = outdf.apply(lambda row: ensureEmptyString(row['in_CoordinateMethodCV']), axis=1)
outdf['in_CoordinateMethodCV'].unique()

In [None]:
outdf['in_County'] = outdf.apply(lambda row: ensureEmptyString(row['in_County']), axis=1)
outdf['in_County'].unique()

In [None]:
outdf['in_SiteTypeCV'] = outdf.apply(lambda row: ensureEmptyString(row['in_SiteTypeCV']), axis=1)
outdf['in_SiteTypeCV'].unique()

In [None]:
outdf['in_SiteName'] = outdf.apply(lambda row: ensureEmptyString(row['in_SiteName']), axis=1)
outdf['in_SiteName'].unique()

In [None]:
outdf['in_AllocationLegalStatusCV'] = outdf.apply(lambda row: ensureEmptyString(row['in_AllocationLegalStatusCV']), axis=1)
outdf['in_AllocationLegalStatusCV'].unique()

In [None]:
outdf['in_AllocationOwner'] = outdf.apply(lambda row: ensureEmptyString(row['in_AllocationOwner']), axis=1)
outdf['in_AllocationOwner'].unique()

In [None]:
outdf['in_AllocationTypeCV'] = outdf.apply(lambda row: ensureEmptyString(row['in_AllocationTypeCV']), axis=1)
outdf['in_AllocationTypeCV'].unique()

In [None]:
outdf['in_BeneficialUseCategory'] = outdf.apply(lambda row: ensureEmptyString(row['in_BeneficialUseCategory']), axis=1)
outdf['in_BeneficialUseCategory'].unique()

In [None]:
# Fixing in_AllocationFlow_CFS datatype
outdf['in_AllocationFlow_CFS'] = pd.to_numeric(outdf['in_AllocationFlow_CFS'], errors='coerce').replace(0,"").fillna("")
outdf['in_AllocationFlow_CFS'].unique()

In [None]:
# Fixing in_AllocationVolume_AF datatype
outdf['in_AllocationVolume_AF'] = pd.to_numeric(outdf['in_AllocationVolume_AF'], errors='coerce').replace(0,"").fillna("")
outdf['in_AllocationVolume_AF'].unique()

In [None]:
# in_Latitude
outdf['in_Latitude'] = pd.to_numeric(outdf['in_Latitude'], errors='coerce').fillna("")
outdf['in_Latitude'].unique()

In [None]:
# in_Longitude
outdf['in_Longitude'] = pd.to_numeric(outdf['in_Longitude'], errors='coerce').fillna("")
outdf['in_Longitude'].unique()

In [None]:
issue of trying to get mixed var types in these fields
# Convert in_IrrigatedAcreage to numeric
outdf['in_IrrigatedAcreage'] = pd.to_numeric(outdf['in_IrrigatedAcreage'], errors='coerce').fillna(0).astype(float).astype(int).replace(0,"")
outdf['in_IrrigatedAcreage'].unique()

In [None]:
issue of trying to get mixed var types in these fields
# Convert in_PopulationServed to numeric
outdf['in_PopulationServed'] = pd.to_numeric(outdf['in_PopulationServed'], errors='coerce').fillna(0).astype(float).astype(int).replace(0,"")
outdf['in_PopulationServed'].unique()

In [None]:
# Creating WaDE Custom water source native ID for easy water source identification
# ----------------------------------------------------------------------------------------------------

# Create temp WaterSourceNativeID dataframe of unique water source.
def assignWaterSourceNativeID(colrowValue):
    string1 = str(colrowValue)
    outstring = "wadeID" + string1
    return outstring

dfWaterSourceNativeID = pd.DataFrame()
dfWaterSourceNativeID['in_WaterSourceName'] = outdf['in_WaterSourceName']
dfWaterSourceNativeID['in_WaterSourceTypeCV'] = outdf['in_WaterSourceTypeCV']
dfWaterSourceNativeID = dfWaterSourceNativeID.drop_duplicates()

dftemp = pd.DataFrame(index=dfWaterSourceNativeID.index)
dftemp["Count"] = range(1, len(dftemp.index) + 1)
dfWaterSourceNativeID['in_WaterSourceNativeID'] = dftemp.apply(lambda row: assignWaterSourceNativeID(row['Count']), axis=1)
dfWaterSourceNativeID['linkKey'] = dfWaterSourceNativeID['in_WaterSourceName'].astype(str) + dfWaterSourceNativeID['in_WaterSourceTypeCV'].astype(str)

# ----------------------------------------------------------------------------------------------------

# Retreive WaDE Custom water source native ID
WaterSourceNativeIDdict = pd.Series(dfWaterSourceNativeID.in_WaterSourceNativeID.values, index=dfWaterSourceNativeID.linkKey.astype(str)).to_dict()
def retrieveWaterSourceNativeID(A, B):
    if (A == '' and B == '') or (pd.isnull(A) and pd.isnull(B)):
        outList = ''
    else:
        colrowValue = str(A).strip() + str(B).strip()
        try:
            outList = WaterSourceNativeIDdict[colrowValue]
        except:
            outList = ''
    return outList

outdf['in_WaterSourceNativeID'] = outdf.apply(lambda row: retrieveWaterSourceNativeID( row['in_WaterSourceName'], row['in_WaterSourceTypeCV']), axis=1)
outdf['in_WaterSourceNativeID'].unique()

## Drop non-Active AllocationLegalStatusCV Water Rights
- For CA, we don't want water rights that are considered: "Cancelled", "Closed", "Inactive", "Pending", "Rejected", "Revoked"

In [None]:
# drop non-active AllocationLegalStatusCV values specific to that state.

print(f'length of df before removing non-active rights: ', len(outdf))

# drop list
dropLegalStatusList = ["Cancelled", "Closed", "Inactive", "Pending", "Rejected", "Revoked"]

# drop rows from above list
outdf = outdf[outdf.in_AllocationLegalStatusCV.isin(dropLegalStatusList) == False].reset_index(drop=True)

print(f'length of df after removing non-active rights: ', len(outdf))
for x in outdf['in_AllocationLegalStatusCV'].sort_values().unique():
    print(f'"' + x + '",')

## Review and Export

In [None]:
outdf.dtypes

In [None]:
outdf

In [None]:
# Export the output dataframe
outdf.to_csv('Pwr_caMain.zip', compression=dict(method='zip', archive_name='Pwr_caMain.csv'), index=False)  # The output, save as a zip
#dfPoUshape.to_csv('P_Geometry.zip', compression=dict(method='zip', archive_name='P_Geometry.csv'), index=False)  # The output geometry.