# Testing the Water Data for submission into 2023 Spring Water submission
This is the second notebook in creating a dependable model for validating the related water data, and do the tests on Nebraska Water system. 

In [2]:
import pandas as pd
from libraries import general

In [3]:
counties = general.get_Counties_FIPS('NE')

## Required Libraries
These are the required libraries for validation. Will later be added into a separate library

In [1]:
# First attempt to creat Data class models
import pandas as pd
from datetime import date
from typing import Optional, List, Literal
from pydantic import BaseModel, ValidationError, Field, conint, confloat, constr, validator
import datetime

In [3]:
# ['RowIdentifier', 'PWSIDNumber', 'YearAssociatedTo', 'YearPulled',
#       'PWSName', 'PrincipalCountyServedFIPS', 'PrincipalCityFeatureID',
#       'TotalConnections', 'SystemPopulation', 'PrimarySourceCode', 'Latitude',
#       'Longitude', 'LocationDerivationCode']

class PWS_Inventory(BaseModel):
    RowIdentifier: int
    StateFIPSCode: int
    PWSIDNumber: constr(regex=r'^NE\d{7}') #Change NE to represent your state code
    
    YearAssociatedTo: conint(ge=1999, le=2023) 
    YearPulled: conint(ge=1999, le=2023)
    
    PWSName: str #Should it have distinction between Unknows and Not Submitted? or just be blank?
    
    PrincipalCountyServedFIPS: str

    @validator('PrincipalCountyServedFIPS')
    def check_PrincipalCountyServedFIPS(cls, v):
        allowed_values = counties['fips'].tolist()
        if v not in allowed_values:
            raise ValueError('PrincipalCountyServedFIPS must be a valid FIPS code')
        return v      

    PrincipalCityFeatureID: int # ????How to get it from the introduced source?

    TotalConnections: conint(ge=1, le=9999999)
    SystemPopulation: conint(ge=10, le=99999999)
    PrimarySourceCode: Literal['GU', 'GUP', 'GW', 'GWP', 'SW', 'SWP', 'U', 'NS']

    # For Nebraska in NAD83
    Latitude: confloat(ge= 39.999998, le=43.001702) 
    Longitude: confloat(ge= -104.053514, le=-95.308290)
    LocationDerivationCode: Literal['SA', 'MFL', 'PCS', 'GSH','O', '-999', '-888']

        
# ['RowIdentifier', 'PWSIDNumber', 'Year', 'AnalyteCode', 'DateSampled',
#        'AggregationType', 'NumSamplingLocations', 'SummaryTimePeriod',
#        'NumSamples', 'NumNonDetects', 'ConcentrationUnits', 'Concentration']
class Sampling_Summary(BaseModel):
    RowIdentifier: int
    PWSIDNumber: constr(regex=r'^NE\d{7}') #for Nebraska

    Year: conint(ge=1999, le=2023)
    
    
    AnalyteCode: Literal['1005', '2050', '2456', '2950', '2039','1040', '2987', 
    '2984', '4010', '4006']
    ConcentrationUnits: Literal['ug/l', 'mg/l','pci/l'] # TODO: Apply the rules of what Analyte each applies to
    Concentration: float

    DateSampled: datetime.date #validate to be from 1/1/1999 to the latest complete year

    AggregationType: Literal['X', 'MX']
    NumSamplingLocations: conint(ge=1, le=9999) #TODO: '-888' for Not Submitted
    SummaryTimePeriod: str #TODO: look into its Data Dictionary
    NumSamples: int
    NumNonDetects: int



class Sampling(BaseModel):
    RowIdentifier: int
    PWSIDNumber: constr(regex=r'^NE\d{7}') #for Nebraska

    Year: conint(ge=1999, le=2023)
    
    
    AnalyteCode: Literal['1005', '2050', '2456', '2950', '2039','1040', '2987', 
    '2984', '4010', '4006']
    # check if ConcentrationUnits is one of the strings in this list, make the list case insensitive
    ConcentrationUnits: Literal['ug/l', 'mg/l','pci/l'] # TODO: Apply the rules of what Analyte each applies to

    # check if Concentration is a float and is greater than or equal to 0
    Concentration: confloat(ge=0.0)

    DateSampled: datetime.date #validate to be from 1/1/1999 to the latest complete year


## Checking the PWS_Inventory


In [4]:
inventory = pd.read_excel('/Users/babak.jfard/projects/ETHTracking/Data/Water_Data/PWSInventory_latest.xlsx')

In [5]:
inventory.PWSIDNumber.nunique()

596

In [6]:
# Check for duplicates
inventory[inventory.duplicated(subset=['PWSIDNumber', 'YearAssociatedTo'], keep=False)]

Unnamed: 0,StateFIPSCode,PWSIDNumber,YearAssociatedTo,YearPulled,PWSName,PrincipalCountyServedName,PrincipalCountyServed FIPS,PrincipalCityName,﻿PrincipalCityFeatureId,TotalConnections,SystemPopulation,PrimarySourceCode,Horiz_Ref_Datum,Latitude,Longitude,LocationDerivationCode


In [7]:
inventory.columns.tolist()

['StateFIPSCode',
 'PWSIDNumber',
 'YearAssociatedTo',
 'YearPulled',
 'PWSName',
 'PrincipalCountyServedName',
 'PrincipalCountyServed FIPS',
 'PrincipalCityName',
 '\ufeffPrincipalCityFeatureId',
 'TotalConnections',
 'SystemPopulation',
 'PrimarySourceCode',
 'Horiz_Ref_Datum',
 'Latitude',
 'Longitude',
 'LocationDerivationCode']

In [10]:
#inventory.columns = inventory.columns.str.replace('\ufeff', '')

#Change the names of several columns to match the names in the validator
inventory.rename(columns={'PrincipalCountyServed FIPS': 'PrincipalCountyServedFIPS', '\ufeffPrincipalCityFeatureId': 'PrincipalCityFeatureID'}, inplace=True)

In [11]:
inventory.to_csv('/Users/babak.jfard/projects/ETHTracking/Data/Water_Data/PWSInventory_latest.csv', index=False)

In [10]:
# Adding a uique Identifier, as first column, for each row
inventory.insert(0, 'RowIdentifier', inventory.index)
#inventory['RowIdentifier'] = inventory.index

In [11]:
rm_column = list(set(inventory.columns) - set((PWS_Inventory.__fields__.keys())))

In [14]:
set(PWS_Inventory.__fields__.keys()) - set(inventory.columns)

set()

In [15]:
inventory.Horiz_Ref_Datum.isna().sum()

28

In [13]:
inventory.drop(columns=rm_column, inplace=True)

In [17]:
inventory.columns

Index(['RowIdentifier', 'StateFIPSCode', 'PWSIDNumber', 'YearAssociatedTo',
       'YearPulled', 'PWSName', 'PrincipalCountyServedName',
       'PrincipalCountyServedFIPS', 'PrincipalCityName',
       'PrincipalCityFeatureID', 'TotalConnections', 'SystemPopulation',
       'PrimarySourceCode', 'Horiz_Ref_Datum', 'Latitude', 'Longitude',
       'LocationDerivationCode'],
      dtype='object')

In [18]:
PWS_Inventory.__fields__.keys()

dict_keys(['RowIdentifier', 'StateFIPSCode', 'PWSIDNumber', 'YearAssociatedTo', 'YearPulled', 'PWSName', 'PrincipalCountyServedFIPS', 'PrincipalCityFeatureID', 'TotalConnections', 'SystemPopulation', 'PrimarySourceCode', 'Latitude', 'Longitude', 'LocationDerivationCode'])

In [19]:
# Doing the validation for each row as a PWS_Inventory object
valid_rows = []
# Creat a dictionary that contains the RowIdentifier of the invalid rows and the error message
invalid_rows = {}
for index, row in inventory.iterrows():
    
    try:
        PWS_Inventory(**row)
        # If passeed, add RowIdentifier into valid_rows list
        valid_rows.append(row['RowIdentifier'])

    except ValidationError as e:
        # If failed, add RowIdentifier and the error message into invalid_rows dictionary
        invalid_rows[row['RowIdentifier']] = e.errors

        print(e)

NameError: name 'counties' is not defined

In [17]:
invalid_rows

{47: <bound method ValidationError.errors of ValidationError(model='PWS_Inventory', errors=[{'loc': ('Latitude',), 'msg': 'ensure this value is greater than or equal to 39.999998', 'type': 'value_error.number.not_ge', 'ctx': {'limit_value': 39.999998}}, {'loc': ('Longitude',), 'msg': 'ensure this value is greater than or equal to -104.053514', 'type': 'value_error.number.not_ge', 'ctx': {'limit_value': -104.053514}}, {'loc': ('LocationDerivationCode',), 'msg': "unexpected value; permitted: 'SA', 'MFL', 'PCS', 'GSH', 'O', '-999', '-888'", 'type': 'value_error.const', 'ctx': {'given': nan, 'permitted': ('SA', 'MFL', 'PCS', 'GSH', 'O', '-999', '-888')}}])>,
 49: <bound method ValidationError.errors of ValidationError(model='PWS_Inventory', errors=[{'loc': ('Latitude',), 'msg': 'ensure this value is greater than or equal to 39.999998', 'type': 'value_error.number.not_ge', 'ctx': {'limit_value': 39.999998}}, {'loc': ('Longitude',), 'msg': 'ensure this value is greater than or equal to -104.

In [18]:
# Take the errorous rows of the inventory dataframe from key values in invalid_rows dictionary
errorous_rows = inventory[inventory['RowIdentifier'].isin(invalid_rows.keys())]

In [19]:
errorous_rows

Unnamed: 0,RowIdentifier,StateFIPSCode,PWSIDNumber,YearAssociatedTo,YearPulled,PWSName,PrincipalCountyServedFIPS,PrincipalCityFeatureID,TotalConnections,SystemPopulation,PrimarySourceCode,Latitude,Longitude,LocationDerivationCode
47,47,31,NE3110910,2022,2023,"BENNET, VILLAGE OF",31109,-888,428,1084,GWP,,,
49,49,31,NE3121227,2022,2023,BIC JOINT WATER AGENCY,31065,827204,3,355,GW,,,
127,127,31,NE3121429,2022,2023,COTTONWOOD TERRACE,31111,831719,243,550,GWP,,,
138,138,31,NE3110704,2022,2023,"CROFTON, CITY OF",31107,828463,368,754,SWP,,,
139,139,31,NE3120824,2022,2023,CROOKED CREEK WATER SYSTEM,31109,837279,33,64,GW,,,
180,180,31,NE3121485,2022,2023,EAGLE MHC,31025,828917,53,97,GWP,,,
279,279,31,NE3121481,2022,2023,K & K MANUFACTURED HOME COMMUNITY,31001,829848,50,180,GWP,,,
311,311,31,NE3121368,2022,2023,LOWER BIG BLUE NRD - WYMORE,31067,834893,228,856,GWP,,,
328,328,31,NE3121486,2022,2023,MARTINVIEW MHP,31153,827304,94,385,SWP,,,
338,338,31,NE3121363,2022,2023,MEADOWBROOK ESTATES WATER SYSTEM,31055,835483,266,675,SWP,,,


## Checking the Sampling
This is the latest file (The unaggregated)

In [2]:
import pandas as pd
sampling = pd.read_excel('/Users/babak.jfard/projects/ETHTracking/Data/Water_Data/PWSSampleResults_unagregated_20230404.xlsx')

In [4]:
sampling.columns

Index(['PWSIDNumber', 'Year', 'AnalyteName', 'AnalyteCode',
       'ConcentrationUnits', 'Concentration', 'DateSampled', 'SamplePointID',
       'DetectionLimit', 'DetectionLimitUom', 'NonDetectFlag'],
      dtype='object')

In [29]:
# check for duplicates, and add them into a separate dataframe

duplicates = sampling[sampling.duplicated(subset=['PWSIDNumber', 'Year', 'AnalyteCode', 'DateSampled', 'SamplePointID'], keep=False)]

In [11]:
duplicates.to_excel('/Users/babak.jfard/projects/ETHTracking/Data/Water_Data/duplicates.xlsx')

In [30]:
sampling.columns

Index(['PWSIDNumber', 'Year', 'AnalyteName', 'AnalyteCode',
       'ConcentrationUnits', 'Concentration', 'DateSampled', 'SamplePointID',
       'DetectionLimit', 'DetectionLimitUom', 'NonDetectFlag'],
      dtype='object')

In [31]:
Sampling.__fields__.keys()

dict_keys(['RowIdentifier', 'PWSIDNumber', 'Year', 'AnalyteCode', 'ConcentrationUnits', 'Concentration', 'DateSampled'])

In [6]:
sampling.AnalyteCode.value_counts()

1038    21928
2050     7684
2039     7684
2987     6518
2984     6518
1005     6268
2950     3457
2456     3416
4010     3093
4006     1503
1041       17
1040       17
Name: AnalyteCode, dtype: int64

In [9]:
# count the number of rows for each year, adding heading to the output
sampling.Year.value_counts().to_frame('Number of Rows')

Unnamed: 0,Number of Rows
2012,9069
2018,7491
2015,7140
2013,6970
2016,6460
2021,6444
2019,6396
2017,6292
2020,5972
2014,5869


In [32]:
# For AnalyteCode Replace all 1038 values with 1040
sampling['AnalyteCode'] = sampling['AnalyteCode'].replace(1038, 1040)

# Delete all rows with 1041 as AnalyteCode, which are only NITRITE tests
sampling = sampling[sampling['AnalyteCode'] != 1041] #Contained only 17 rows

In [33]:
sampling['ConcentrationUnits'] = sampling['ConcentrationUnits'].str.lower()

In [34]:
del_cols = list(set(sampling.columns) - set(Sampling.__fields__.keys()))

# Remove the columns that are not in the Sampling validator
sampling_validation = sampling.drop(columns=del_cols)

sampling_validation.insert(0, 'RowIdentifier', sampling.index)

In [35]:
print(sampling_validation.columns)
print(Sampling.__fields__.keys())

Index(['RowIdentifier', 'PWSIDNumber', 'Year', 'AnalyteCode',
       'ConcentrationUnits', 'Concentration', 'DateSampled'],
      dtype='object')
dict_keys(['RowIdentifier', 'PWSIDNumber', 'Year', 'AnalyteCode', 'ConcentrationUnits', 'Concentration', 'DateSampled'])


In [36]:
# change type of colum AnlyteCode to string
sampling_validation['AnalyteCode'] = sampling_validation['AnalyteCode'].astype(str)

In [37]:
# now validating the sampling dataframe
# Doing the validation for each row as a PWS_Inventory object
valid_rows_sampling = []
# Creat a dictionary that contains the RowIdentifier of the invalid rows and the error message
invalid_rows_sampling = {}
for index, row in sampling_validation.iterrows():
    
    try:
        Sampling(**row)
        # If passeed, add RowIdentifier into valid_rows list
        valid_rows_sampling.append(row['RowIdentifier'])

    except ValidationError as e:
        # If failed, add RowIdentifier and the error message into invalid_rows dictionary
        invalid_rows_sampling[row['RowIdentifier']] = e.errors

        #print(e)

In [38]:
invalid_rows_sampling

{}

In [65]:
sampling.to_csv('/Users/babak.jfard/projects/ETHTracking/Data/Water_Data/Sample_results.csv', index=False)

In [57]:
# Looks like there are more undefined AnalyteCodes in the sampling dataframe
# Let's see what they are
sampling.AnalyteCode.value_counts()

1040    21945
2050     7684
2039     7684
2987     6518
2984     6518
1005     6268
2950     3457
2456     3416
4010     3093
4006     1503
Name: AnalyteCode, dtype: int64

In [64]:
# What are allowable AnalyteCodes as defined in the Sampling class
Sampling.__fields__['AnalyteCode'].type_


typing.Literal['1005', '2050', '2456', '2950', '2039', '1040', '2987', '2984', '4010', '226', '228', '4006']

### How to Aggregate into Sampling Results

Each community water system:
* annual mean and maz concentration of:

--- arsenic, disinfection byproducts (HAA5 and TTHM), 

--- nitrates, 

--- atrazine, 

--- di(2-ethylhexyl) phthalate (DEHP), 

--- radium, 

--- tetrachloroethene (tetrachloroethylene) (PCE), 

--- trichloroethene (trichloroethylene) (TCE), and 

--- uranium


* Mean concentration per quarter 

--- Nitrate

---- Atrazine

In [22]:
set(inventory.PWSIDNumber.unique()) - set(sampling.PWSIDNumber.unique())

{'NE3104308', 'NE3117304', 'NE3117903', 'NE3120358'}

In [30]:
sampling.columns

Index(['RowIdentifier', 'PWSIDNumber', 'Year', 'AnalyteCode',
       'ConcentrationUnits', 'Concentration', 'DateSampled'],
      dtype='object')

In [33]:
sampling.Year.value_counts()

2012    9069
2018    7490
2015    7139
2013    6968
2016    6460
2021    6441
2019    6396
2017    6292
2020    5963
2014    5868
Name: Year, dtype: int64

In [35]:
sampling.head()

Unnamed: 0,RowIdentifier,PWSIDNumber,Year,AnalyteCode,ConcentrationUnits,Concentration,DateSampled
0,0,NE3103902,2012,1040,mg/l,5.52,2012-01-03
1,1,NE3103902,2012,1040,mg/l,4.92,2012-01-03
2,2,NE3107101,2012,1040,mg/l,5.56,2012-01-03
3,3,NE3114902,2012,1040,mg/l,1.62,2012-01-03
4,4,NE3114902,2012,1040,mg/l,6.78,2012-01-03


In [36]:
sampling.dtypes

RowIdentifier                  int64
PWSIDNumber                   object
Year                           int64
AnalyteCode                   object
ConcentrationUnits            object
Concentration                float64
DateSampled           datetime64[ns]
dtype: object