In [1]:
import csv
import numpy as np
import pandas as pd
import re

from os import path

pd.options.display.max_columns = None

In [2]:
rootdir = '../data/water'

input_file = 'CA-result.csv'
measure_to_measure_group = 'measure-to-measuregroup.csv'

input_base = path.splitext(input_file)[0]

bad_values = 'badvalues.rejects.csv'
unmatched_measures = 'unmatched-measures.rejects.csv'
unmapped_units = 'unmapped-units.rejects.csv'
result_clean = 'clean.csv'

def makepath(suf):
    file = input_base + '-' + suf
    print(file)
    return path.join(rootdir, file)

In [3]:
data_columns = [
    'ActivityMediaName', 
    'ActivityMediaSubdivisionName',
    'ActivityStartDate', 
    'ActivityStartTime/Time',
    'ActivityStartTime/TimeZoneCode', 
    'MonitoringLocationIdentifier', 
    'CharacteristicName',
    'ResultMeasureValue', 
    'ResultMeasure/MeasureUnitCode',
    'ResultCommentText',
    'OrganizationIdentifier', 
    'OrganizationFormalName',
    'ActivityTypeCode',
    'ResultSampleFractionText',
    'MeasureQualifierCode',
    'ResultStatusIdentifier',
    'ResultAnalyticalMethod/MethodIdentifier',
    'ResultAnalyticalMethod/MethodName',
    'ResultLaboratoryCommentText'
]

activity_media = ['Water']

In [4]:
%%time
data = pd.read_csv(path.join(rootdir, input_file),
                  error_bad_lines=False,
                  usecols=data_columns)

len(data)



CPU times: user 1min 17s, sys: 17.8 s, total: 1min 35s
Wall time: 1min 50s


In [5]:
data["Value"] = pd.to_numeric(data.ResultMeasureValue, errors='coerce')

In [6]:
data = data[data.ActivityMediaName.isin(activity_media)]
len(data)

12392050

In [7]:
badValues = data.loc[pd.isnull(data.Value)]
badValues.to_csv(makepath(bad_values), index=False, quoting=csv.QUOTE_ALL)
print(len(badValues))
del badValues

CA-result-badvalues.rejects.csv
1581450


In [8]:
data = data[pd.notnull(data.Value)]
len(data)

10810600

In [9]:
measures = pd.read_csv(path.join(rootdir, measure_to_measure_group))
# measures

In [10]:
%%time
measureMap = pd.DataFrame(data.CharacteristicName.unique(), columns=['CharacteristicName'])

for _,row in measures.iterrows():
    pattern = re.compile(row.Pattern, re.IGNORECASE)
    matches = measureMap.CharacteristicName.str.contains(row.Pattern, case=False)
    measureMap.loc[matches, 'MeasureGroup'] = row.MeasureGroup
    measureMap.loc[matches, 'MCLG'] = row.MCLG
    measureMap.loc[matches, 'Unit'] = row.Unit
    
measureMap = measureMap[pd.notnull(measureMap.MeasureGroup)]
data = pd.merge(data, measureMap, on='CharacteristicName')

CPU times: user 5.14 s, sys: 4.24 s, total: 9.38 s
Wall time: 11.8 s


In [11]:
len(data[data.MeasureGroup == 'Nitrate'])

159619

In [12]:
nonMatchingMeasures = data.loc[pd.isnull(data.MeasureGroup), 'CharacteristicName'].unique()
pd.DataFrame(nonMatchingMeasures).to_csv(makepath(unmatched_measures), 
                                         index=False, quoting=csv.QUOTE_ALL)
print(len(nonMatchingMeasures))
del nonMatchingMeasures

CA-result-unmatched-measures.rejects.csv
0


In [13]:
data = data[pd.notnull(data.MeasureGroup)]
print(len(data))

742224


In [14]:
data['OriginalUnit'] = data['ResultMeasure/MeasureUnitCode'].str.strip()

In [15]:
multipliers = pd.Series({
    'mg/l': 1,
    'mg/l as N': 1,
    'mg/kg': 1,
    'mg/kg as N': 1,
    'ug/l': 1/1000,
    'ug/kg': 1/1000,
    'ng/l': 1/1000000,
    'pg/l': 1/1000000000,
    'ppm': 1,
    'ppb': 1/1000,
    'ueq/l': 62
}, name='Multiplier')
multipliers.index.name = 'OriginalUnit'

multipliers = multipliers.reset_index()

In [16]:
merged = pd.merge(data, multipliers, on='OriginalUnit', how='left')

In [17]:
unmappedUnits = merged.loc[pd.isnull(merged.Multiplier)]
unmappedUnits.to_csv(makepath(unmapped_units), index=False, quoting=csv.QUOTE_ALL)
print(len(unmappedUnits))
del unmappedUnits

CA-result-unmapped-units.rejects.csv
4045


In [18]:
merged = merged.loc[pd.notnull(merged.Multiplier)]
merged['OriginalValue'] = merged.Value
merged.Value = merged.Value * merged.Multiplier
len(merged)

738179

In [19]:
merged['ExceedsMclg'] = merged.Value > merged.MCLG

In [20]:
keepers = merged[[
        'ActivityMediaName',
        'ActivityMediaSubdivisionName',
        'ActivityStartDate',
        'ActivityStartTime/Time',
        'ActivityStartTime/TimeZoneCode',
        'MonitoringLocationIdentifier',
        'CharacteristicName',
        'MeasureGroup',
        'Unit',
        'Value',
        'MCLG',
        'ExceedsMclg',
        'ResultCommentText',
        'OrganizationIdentifier',
        'OrganizationFormalName',
        'ActivityTypeCode',
        'ResultSampleFractionText',
        'MeasureQualifierCode',
        'ResultStatusIdentifier',
        'ResultAnalyticalMethod/MethodIdentifier',
        'ResultAnalyticalMethod/MethodName',
        'ResultLaboratoryCommentText'

]]

keepers.columns = ['Medium', 'MediumSubdivision', 'StartDate', 'StartTime', 'TimeZone',
                  'LocationIdentifier', 'Pollutant', 'PollutantGroup', 'Unit', 'Value', 
                  'Mclg', 'ExceedsMclg', 'Comment', 'OrganizationId', 'OrganizationName',
                  'ActivityTypeCode', 'ResultSampleFraction', 'QualifierCode', 
                  'ResultStatus', 'AnalyticalMethodIdentifier', 'AnalyticalMethodName',
                  'LaboratoryComment']

In [21]:
keepers.to_csv(makepath(result_clean), index=False, quoting = csv.QUOTE_ALL)

CA-result-clean.csv
