# Explore CSV files and format fields to streamline the integration in a SQL DB

Most date fields in our csv are not formatted in the way MySQL expect to find them (_expected format is `YYYY-MM-DD`_). Some pre-processing is required.

**Note**: all the table from the original archive contain a comma at the end of the header. This mess up with `pandas` and other data wrangling tool. I copied the original data in files prefix with `DG_` that I use below to process the dates. _e.g._ `WATER-SYSTEM.csv` becomes `DG_WATER_SYSTEM.csv` with a first header row which DOES NOT terminate by a comma.

In [288]:
from os.path import join
import uuid

import pandas as pd
from datetime import datetime as dt

**Important**: Update the global variable below with the absolute path to the folder containing all the csv files.

In [97]:
PATH_TO_DATA_FOLDER = "/Users/fpaupier/projects/safe-water/data/SDWIS/"

## `WATER_SYSTEM` table

There are 5 dates fields in this table. With two types of formatting:

1. Date formatted in `dd-mm-yy` _e.g `01-JUN-83` for June 1, 1983_. Fields encoded with this date format are:
    - `OUTSTANDING_PERFORM_BEGIN_DATE`
    - `PWS_DEACTIVATION_DATE`
    - `SOURCE_PROTECTION_BEGIN_DATE`
    
    Note that this formatting is ambiguous about the year. 
    --> Dates fromatted that way will be converted to the MySQL friendly date format `YYY-MM-DD`.
    
    
2. Dates are also formatted with the `MM-DD` format, fields encoded that way are:
     - `SEASON_BEGIN_DATE` formatted in `MM-DD`
     - `SEASON_END_DATE` formatted in `MM-DD`
     
     Note that those dates inform on year-recurring events, they occur every years. We keep them as raw text fields. Fine grained processing of those dates will be done by consumers applications.
     


## Load and format data

In [236]:
OUTSTANDING_PERFORM_BEGIN_DATE_idx = 44
PWS_DEACTIVATION_DATE_idx = 8
SOURCE_PROTECTION_BEGIN_DATE_idx = 42

In [237]:
df = pd.read_csv(join(PATH_TO_DATA_FOLDER, "DG_WATER_SYSTEM.csv"),
                 sep=",",
                 header=0,
                 index_col=0,
                 encoding="utf-8",
                 low_memory=False, #To avoid type inference
                 parse_dates=[PWS_DEACTIVATION_DATE_idx, SOURCE_PROTECTION_BEGIN_DATE_idx, OUTSTANDING_PERFORM_BEGIN_DATE_idx])




In [238]:
df.head()

Unnamed: 0_level_0,WATER_SYSTEM.PWS_NAME,WATER_SYSTEM.NPM_CANDIDATE,WATER_SYSTEM.PRIMACY_AGENCY_CODE,WATER_SYSTEM.EPA_REGION,WATER_SYSTEM.SEASON_BEGIN_DATE,WATER_SYSTEM.SEASON_END_DATE,WATER_SYSTEM.PWS_ACTIVITY_CODE,WATER_SYSTEM.PWS_DEACTIVATION_DATE,WATER_SYSTEM.PWS_TYPE_CODE,WATER_SYSTEM.DBPR_SCHEDULE_CAT_CODE,...,WATER_SYSTEM.CITY_NAME,WATER_SYSTEM.ZIP_CODE,WATER_SYSTEM.COUNTRY_CODE,WATER_SYSTEM.STATE_CODE,WATER_SYSTEM.SOURCE_WATER_PROTECTION_CODE,WATER_SYSTEM.SOURCE_PROTECTION_BEGIN_DATE,WATER_SYSTEM.OUTSTANDING_PERFORMER,WATER_SYSTEM.OUTSTANDING_PERFORM_BEGIN_DATE,WATER_SYSTEM.CITIES_SERVED,WATER_SYSTEM.COUNTIES_SERVED
WATER_SYSTEM.PWSID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
AR1900063,USCOE BSWP118 PONTIAC,N,AR,6,01-01,12-31,I,1983-06-01,TNCWS,,...,LITTLE ROCK,72203,US,AR,,NaT,,NaT,Not Reported,Marion
AR1900071,USCOE BSWP126 HIGHWAY K,N,AR,6,01-01,12-31,I,1983-06-01,TNCWS,,...,LITTLE ROCK,72203,US,AR,,NaT,,NaT,Not Reported,Marion
AR1900072,USCOE BSW127 LOWERY,N,AR,6,01-01,12-31,I,1983-06-01,TNCWS,,...,LITTLE ROCK,72203,US,AR,,NaT,,NaT,Not Reported,Marion
AR1900075,USCOE GFW02 DAM SITE,N,AR,6,01-01,12-31,I,1983-06-01,TNCWS,,...,LITTLE ROCK,72203,US,AR,,NaT,,NaT,Not Reported,Cleburne
AR1900076,USCOE GFW03 DAM SITE,N,AR,6,01-01,12-31,I,1983-06-01,TNCWS,,...,LITTLE ROCK,72203,US,AR,,NaT,,NaT,Not Reported,Cleburne


### Sanitize booleans
The fields `IS_GRANT_ELIGIBLE_IND`, `IS_WHOLESALER_IND`, `NPM_CANDIDATE` and `IS_SCHOOL_OR_DAYCARE_IND` are text values `N` or `Y`. They should be casted as `Booleans` in the database to allow easily expressed queries. 

In [239]:
df['WATER_SYSTEM.IS_GRANT_ELIGIBLE_IND'].dropna().unique()

array(['N', 'Y'], dtype=object)

In [240]:
df['WATER_SYSTEM.IS_WHOLESALER_IND'].dropna().unique()

array(['N', 'Y'], dtype=object)

In [241]:
df['WATER_SYSTEM.IS_SCHOOL_OR_DAYCARE_IND'].dropna().unique()

array(['N', 'Y'], dtype=object)

In [242]:
df['WATER_SYSTEM.NPM_CANDIDATE'].dropna().unique()

array(['N', 'Y'], dtype=object)

In [243]:
def type_checker(arr):
    ref = arr[0]
    ref_type = type(ref)
    flag = 1;
    for i in arr:
        if type(i) != ref_type:
            print('ERROR!')
            flag = 0;
    if flag == 1:
        print('compatible types')

In [244]:
tst = df['WATER_SYSTEM.ZIP_CODE'].dropna().unique()
tst

array(['72203', '92365', '92398', ..., '49661', '48909-7741',
       '55011-9204'], dtype=object)

In [245]:
type_checker(tst)

compatible types


### Perform sanitization

Map `N` to `False` and `Y` to `True`.

In [246]:
df['WATER_SYSTEM.IS_GRANT_ELIGIBLE_IND'] = df['WATER_SYSTEM.IS_GRANT_ELIGIBLE_IND'].map({'N': 0, 'Y': 1})
df['WATER_SYSTEM.IS_WHOLESALER_IND'] = df['WATER_SYSTEM.IS_WHOLESALER_IND'].map({'N': 0, 'Y': 1})
df['WATER_SYSTEM.IS_SCHOOL_OR_DAYCARE_IND'] = df['WATER_SYSTEM.IS_SCHOOL_OR_DAYCARE_IND'].map({'N': 0, 'Y': 1})
df['WATER_SYSTEM.NPM_CANDIDATE'] = df['WATER_SYSTEM.NPM_CANDIDATE'].map({'N': 0, 'Y': 1});



Check sanitization output:

In [247]:
df['WATER_SYSTEM.IS_GRANT_ELIGIBLE_IND'].dropna().unique()

array([0, 1])

In [248]:
df['WATER_SYSTEM.IS_WHOLESALER_IND'].dropna().unique()

array([0, 1])

In [249]:
df['WATER_SYSTEM.IS_SCHOOL_OR_DAYCARE_IND'].dropna().unique()

array([0, 1])

In [250]:
df['WATER_SYSTEM.NPM_CANDIDATE'].dropna().unique()

array([0, 1])

## Save sanitized data

Save the sanitized dataset in a new csv file:

In [251]:
# Data will be saved in the `sanitized` folder.
sanitized_csv_file = join(PATH_TO_DATA_FOLDER, 'sanitized', 'WATER_SYSTEM.csv')

In [252]:
df.to_csv(sanitized_csv_file, sep=",", encoding='utf-8')



## `WATER_SYSTEM_FACILITY` table

In [282]:
# date to process
FACILITY_DEACTIVATION_DATE_idx = 7
PWS_DEACTIVATION_DATE_idx = 18

In [283]:
wsf = pd.read_csv(join(PATH_TO_DATA_FOLDER, "DG_WATER_SYSTEM_FACILITY.csv"),
                 sep=",",
                 header=0,
                 index_col=0,
                 encoding="utf-8",
                 low_memory=False,
                 parse_dates=[FACILITY_DEACTIVATION_DATE_idx, PWS_DEACTIVATION_DATE_idx],
                 )



In [284]:
# Process binary fields
wsf['WATER_SYSTEM_FACILITY.IS_SOURCE_IND'] = wsf['WATER_SYSTEM_FACILITY.IS_SOURCE_IND'].map({'N': 0, 'Y': 1})

In [285]:
def longest_item(arr):
    m = arr[0]
    l = len(m)
    for i in arr:
        if len(i) > l:
            m = i
            l = len(i)
    return m

In [286]:
longest_item(wsf['WATER_SYSTEM_FACILITY.FACILITY_ID'].unique())

'239126095064'

In [287]:
len(wsf['WATER_SYSTEM_FACILITY.FACILITY_ID'])

1408854

In [272]:
wsf.shape

(1408854, 20)

**Important**: There are only `203 078` unique `FACILITY_ID` in the dataset and yet there are `1 408 854` different records. `FACILITY_ID` alone cannot be counted as a primary key.

Thus we add prepend a ID column as primary key.

In [289]:
ids = []
for idx in range(wsf.shape[0]):
    ids.append(str(uuid.uuid4()))

In [291]:
# Prepend the ID serie to the dataframe
wsf.insert(0, 'ID', pd.Series(ids, index=wsf.index))

In [292]:
wsf.head()

Unnamed: 0_level_0,ID,WATER_SYSTEM_FACILITY.PRIMACY_AGENCY_CODE,WATER_SYSTEM_FACILITY.EPA_REGION,WATER_SYSTEM_FACILITY.FACILITY_ID,WATER_SYSTEM_FACILITY.FACILITY_NAME,WATER_SYSTEM_FACILITY.STATE_FACILITY_ID,WATER_SYSTEM_FACILITY.FACILITY_ACTIVITY_CODE,WATER_SYSTEM_FACILITY.FACILITY_DEACTIVATION_DATE,WATER_SYSTEM_FACILITY.FACILITY_TYPE_CODE,WATER_SYSTEM_FACILITY.SUBMISSION_STATUS_CODE,...,WATER_SYSTEM_FACILITY.WATER_TYPE_CODE,WATER_SYSTEM_FACILITY.AVAILABILITY_CODE,WATER_SYSTEM_FACILITY.SELLER_TREATMENT_CODE,WATER_SYSTEM_FACILITY.SELLER_PWSID,WATER_SYSTEM_FACILITY.SELLER_PWS_NAME,WATER_SYSTEM_FACILITY.FILTRATION_STATUS_CODE,WATER_SYSTEM_FACILITY.PWS_ACTIVITY_CODE,WATER_SYSTEM_FACILITY.PWS_DEACTIVATION_DATE,WATER_SYSTEM_FACILITY.PWS_TYPE_CODE,WATER_SYSTEM_FACILITY.IS_SOURCE_TREATED_IND
WATER_SYSTEM_FACILITY.PWSID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
NY0900222,9c2ce89f-7c29-489a-8c23-0485bcf908aa,NY,2,54061,TRANS. MAIN 6 X 500',000000081224,A,NaT,TM,Y,...,,,,,,,A,NaT,CWS,
NY0900222,e023b201-683b-4ae5-9b2c-7b92a55b0dae,NY,2,75508,XXIDSE-DISTRIBUTION STAGE 2,XXIDSE,I,2012-05-23,DS,Y,...,,,,,,,A,NaT,CWS,
NY1000240,959cf25d-2854-4c85-b14d-97ef385a71b5,NY,2,63273,DISTRIBUTION SYSTEM,DS-01,A,NaT,DS,Y,...,,,,,,,A,NaT,CWS,
NY1000240,e2a3b971-9427-4d3f-b70c-fd01d02161ef,NY,2,73477,"485,000 GALLON STORAGE TANK",ST-01,A,NaT,ST,Y,...,,,,,,,A,NaT,CWS,
NY0611916,54965745-440f-4a8f-90b2-984d5283516f,NY,2,67310,DISTRIBUTION SYSTEM (HV2),DS-001,A,NaT,DS,Y,...,,,,,,,A,NaT,TNCWS,


In [293]:
# Data will be saved in the `sanitized` folder.
sanitized_wsf_file = join(PATH_TO_DATA_FOLDER, 'sanitized', 'WATER_SYSTEM_FACILITY.csv')
wsf.to_csv(sanitized_wsf_file, sep=",", encoding='utf-8', quotechar='"')

In [295]:
len('e5862a56-14ea-4dda-8fed-bf1efb0c9bbd')

36