# Project 2 - Lab 5 - Filter and aggregate the water quality data

Recall that one of the files (starts with `mces`) contains water quality measurements for lakes in the Twin Cities.  In this lab, we will narrow down the list of lakes for which we have at least one of each measurement type (phosphorus and secchi depth) for each year between 2004 and 2015.

**Important note.** Recall that we fixed an issue with the water quality data in our terminal exploration of the data. Be sure to use the corrected version of the water quality data located in the `data/MinneMUDAC_raw_files_fixed/mces_lakes_1999_2014_v2.txt` file.

## Problem 1 - Inspect the data

We will be focusing on two of water quality measurements: phosphorus and secchi depth.  Before we start trimming the data set, we should explore these metrics.

1. Each of the measures has a `QUALIFIER` column.  Group and aggregate by each of these columns and note any problematic values.  **Hint.** This search should indicate that some of the phosphorus measurements should be dropped.  Make sure you include this action as part of your primary query.
2. Each measure also includes a `Units` column.  Check that all measurement are in the same units, and convert as needed.

In [1]:
# Your code here
from glob import glob
import polars as pl

glob('./data/raw_data/*')

['./data/raw_data\\2002_metro_tax_parcels.txt',
 './data/raw_data\\2003_metro_tax_parcels.txt',
 './data/raw_data\\2004_metro_tax_parcels.txt',
 './data/raw_data\\2005_metro_tax_parcels.txt',
 './data/raw_data\\2006_metro_tax_parcels.txt',
 './data/raw_data\\2007_metro_tax_parcels.txt',
 './data/raw_data\\2008_metro_tax_parcels.txt',
 './data/raw_data\\2009_metro_tax_parcels.txt',
 './data/raw_data\\2010_metro_tax_parcels.txt',
 './data/raw_data\\2011_metro_tax_parcels.txt',
 './data/raw_data\\2012_metro_tax_parcels.txt',
 './data/raw_data\\2013_metro_tax_parcels.txt',
 './data/raw_data\\2014_metro_tax_parcels.txt',
 './data/raw_data\\2015_metro_tax_parcels.txt',
 './data/raw_data\\mces_lakes_1999_2014.txt',
 './data/raw_data\\Parcel_Lake_Monitoring_Site_Xref.txt']

In [20]:
(water_quality := pl.read_csv(
    './data/raw_data/mces_lakes_1999_2014.txt',
    separator='\t',
    infer_schema_length=10000,
    schema_overrides={'latitude': pl.String, 'longitude': pl.String},
    columns= wq_cols_keep
))

DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude
str,str,str,f64,str,str,str,f64,str,str,str,str
"""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
"""82010200-01""","""2006-09-30""",,,,"""m""",,,,"""mg/L""","""-92.97171054""","""45.01655642"""
"""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
"""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
"""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
…,…,…,…,…,…,…,…,…,…,…,…
"""10004100-01""","""2002-09-16""",,,,"""m""",,0.224,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""
"""10004100-01""","""2002-10-01""",,2.7,"""Approved""","""m""","""~""",0.026,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""
"""10004100-01""","""2002-10-01""",,,,"""m""","""~""",0.015,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""
"""10004100-01""","""2002-10-14""",,3.0,"""Approved""","""m""","""~""",0.011,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""


In [6]:
water_quality.columns

['PROJECT_ID',
 'DATA_SET_TITLE',
 'LAKE_NAME',
 'CITY',
 'COUNTY',
 'DNR_ID_Site_Number',
 'MAJOR_WATERSHED',
 'WATER_PLANNING_AUTHORITY',
 'LAKE_SITE_NUMBER',
 'START_DATE',
 'START_HOURMIN24',
 'END_DATE',
 'END_HOURMIN24',
 'SAMPLE_DEPTH_IN_METERS',
 'Seasonal_Lake_Grade_RESULT',
 'Seasonal_Lake_Grade_QUALIFIER',
 'Seasonal_Lake_Grade_Units',
 'Physical_Condition_RESULT',
 'Physical_Condition_QUALIFIER',
 'Physical_Condition_Units',
 'Recreational_Suitability_RESULT',
 'Recreational_Suitability_QUALIFIER',
 'Recreational_Suitability_Units',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude']

In [32]:
(wq_cols_keep := [
    'DNR_ID_Site_Number',
    'END_DATE',
    'LAKE_NAME',
    'Secchi_Depth_RESULT_SIGN',
    'Secchi_Depth_RESULT',
    'Secchi_Depth_QUALIFIER',
    'Secchi_Depth_Units',
    'Total_Phosphorus_RESULT_SIGN',
    'Total_Phosphorus_RESULT',
    'Total_Phosphorus_QUALIFIER',
    'Total_Phosphorus_Units',
    'longitude',
    'latitude',
])

['DNR_ID_Site_Number',
 'END_DATE',
 'LAKE_NAME',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude']

In [12]:
water_quality.group_by('Secchi_Depth_QUALIFIER').len()

Secchi_Depth_QUALIFIER,len
str,u32
,13153
"""Approved""",35104


In [13]:
water_quality.group_by('Total_Phosphorus_QUALIFIER').len()

Total_Phosphorus_QUALIFIER,len
str,u32
"""Approved""",43639
"""Suspect""",35
,4583


## Problem 2 - Find filter and aggregate.

#### Tasks

Remember that our goal is to narrow the data to one row per lake per year.  Build a query that groups and aggregates to find the yearly average values for both phosphorus and secchi depth.  To do this your will want to

1. Filter based on what you learned in **Problem 1.**
2. Make sure that the `END_DATE` has the correct type and extract the year.  
3. Filter to the correct range of years.
4. Now you should group and aggregate to compute the yearly means.  We want to keep both the `LAKE_NAME` and lake ID to allow us to join these data to the parcel features we will construct in the next lab.

In [33]:
(wq_filtered := pl.read_csv(
    './data/raw_data/mces_lakes_1999_2014.txt',
    separator='\t',
    infer_schema_length=10000,
    schema_overrides={'latitude': pl.String, 'longitude': pl.String},
    columns=wq_cols_keep
)
.filter(
    (pl.col('Secchi_Depth_QUALIFIER') == 'Approved') &
    (pl.col('Total_Phosphorus_QUALIFIER') == 'Approved')
)
.with_columns(
    pl.col('END_DATE').str.split('-').list.first().cast(pl.Int64).alias('Year')
)
.filter(
    pl.col('Year').is_between(2004, 2015)
)
)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude,Year
str,str,str,str,f64,str,str,str,f64,str,str,str,str,i64
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-06-11""",,0.5,"""Approved""","""m""",,0.058,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Woodpile Lake""","""82013200-01""","""2014-08-13""",,3.05,"""Approved""","""m""",,0.023,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014
"""Woodpile Lake""","""82013200-01""","""2014-08-25""",,2.74,"""Approved""","""m""",,0.026,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014
"""Woodpile Lake""","""82013200-01""","""2014-09-09""",,2.13,"""Approved""","""m""",,0.03,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014
"""Woodpile Lake""","""82013200-01""","""2014-09-22""",,3.66,"""Approved""","""m""",,0.04,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014


In [36]:
(wq_summaries := wq_filtered
.group_by(['DNR_ID_Site_Number',
           'LAKE_NAME', 
           'Year', 
           'latitude', 
           'longitude'])
.agg([
    pl.col('Secchi_Depth_RESULT').mean().alias('avg_secchi_depth'),
    pl.col('Total_Phosphorus_RESULT').mean().alias('avg_total_phosphorus')
])
)

DNR_ID_Site_Number,LAKE_NAME,Year,latitude,longitude,avg_secchi_depth,avg_total_phosphorus
str,str,i64,str,str,f64,f64
"""82004600-01""","""Square Lake""",2012,"""45.156747""","""-92.80441146""",5.441429,0.015786
"""82009700-01""","""La Lake""",2008,"""44.88725237""","""-92.9713984""",1.754545,0.118636
"""27010700-01""","""Parkers Lake""",2008,"""44.99224427""","""-93.47113278""",2.592308,0.024538
"""70009500-01""","""O'Dowd Lake""",2007,"""44.7448931""","""-93.5154039""",1.284615,0.049385
"""82004200-01""","""Lynch Lake""",2009,"""45.15170647""","""-92.88395835""",0.217571,0.460571
…,…,…,…,…,…,…
"""82011602-01""","""Armstrong Lake""",2009,"""44.96252306""","""-92.93917709""",1.114286,0.058929
"""82013700-01""","""Fish Lake""",2012,"""45.12242547""","""-92.97796745""",1.506333,0.108897
"""19003300-01""","""Earley Lake""",2006,"""44.73975323""","""-93.29337284""",1.533333,0.045333
"""82004400-01""","""West Boot Lake""",2013,"""45.1631603""","""-92.83822036""",4.0735,0.0301


## Problem 3 - Find lakes with complete yearly averages.

We want to make sure that we don't have any missing data in the target vectors, so we need to build a query that leads to a list of lake names and codes that fit the following criteria.

1. Only contains years after 2003.
2. Has a non-missing value for both means.
3. Contains both the lake name and the lake code.

You should save this list of lake IDs in a variable named `lakes_w_complete_info` in a file named `lake.py`.  Restart your kernel and confirm that you can import this data.

In [None]:
import polars.selectors as cs
from operator import mul

(wq_missing := wq_summaries
.with_columns(
    (pl.col('avg_secchi_depth').is_not_null() & 
     pl.col('avg_total_phosphorus').is_not_null()
    )
    .cast(pl.Int8)
    .alias('complete_data')
)
.pivot(
    values='complete_data',
    index=['DNR_ID_Site_Number', 'LAKE_NAME'],
    on='Year',
    aggregate_function='sum'
)
.with_columns(all_complete = pl.reduce(mul, cs.integer()))
.filter(pl.col('all_complete') == 1)
)
# I have 1 less lake than Iverson

DNR_ID_Site_Number,LAKE_NAME,2012,2008,2007,2009,2006,2014,2011,2013,2010,2004,2005,all_complete
str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
"""82009700-01""","""La Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82015300-01""","""Sunset Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82012200-01""","""Pine Tree Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82013700-01""","""Fish Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""10000200-01""","""Riley Lake""",1,1,1,1,1,1,1,1,1,1,1,1
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""82010300-01""","""Olson Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""27062700-01""","""Northwood Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82007700-01""","""Goggins Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82009002-01""","""Wilmes Lake""",1,1,1,1,1,1,1,1,1,1,1,1


In [47]:
(dnr_id_site_numbers := wq_missing
.get_column('DNR_ID_Site_Number')
.to_list()
)

['82009700-01',
 '82015300-01',
 '82012200-01',
 '82013700-01',
 '10000200-01',
 '10001900-01',
 '27007000-01',
 '19002400-01',
 '27003501-01',
 '82036800-01',
 '19002100-01',
 '82002000-01',
 '82003400-01',
 '82010100-01',
 '27071100-01',
 '27004201-01',
 '82009400-01',
 '82011602-01',
 '10009500-01',
 '19034800-01',
 '10001100-01',
 '19003300-01',
 '82010400-01',
 '13005300-01',
 '19003100-01',
 '10012100-01',
 '19002601-01',
 '82005400-01',
 '19002500-01',
 '19002900-01',
 '82033400-01',
 '10005200-01',
 '27005300-01',
 '82008700-01',
 '19002700-01',
 '82012300-01',
 '82008900-01',
 '19002200-01',
 '02000500-01',
 '19044600-01',
 '70002600-01',
 '82015900-01',
 '82009200-01',
 '82010300-01',
 '27062700-01',
 '82007700-01',
 '82009002-01',
 '19002300-01']

## Problem 4 - Create and write the final water quality table.

Finally, you should filter the table from **Problem 2.** to the lakes with complete information, then write this table to a parquet file named `water_quality_by_year.parquet`.

In [49]:
(wq_summaries_complete := wq_summaries
 .filter(pl.col('DNR_ID_Site_Number').is_in(dnr_id_site_numbers))
)

DNR_ID_Site_Number,LAKE_NAME,Year,latitude,longitude,avg_secchi_depth,avg_total_phosphorus
str,str,i64,str,str,f64,f64
"""82009700-01""","""La Lake""",2008,"""44.88725237""","""-92.9713984""",1.754545,0.118636
"""82015300-01""","""Sunset Lake""",2006,"""45.13527125""","""-92.94157024""",3.344444,0.022111
"""82012200-01""","""Pine Tree Lake""",2007,"""45.10231359""","""-92.95386928""",2.8,0.021556
"""82013700-01""","""Fish Lake""",2013,"""45.12242547""","""-92.97796745""",1.131923,0.061423
"""10000200-01""","""Riley Lake""",2006,"""44.83469027""","""-93.51695191""",2.29,0.0604
…,…,…,…,…,…,…
"""82033400-01""","""Kismet Lake""",2013,"""45.09536417""","""-92.8917326""",1.67,0.025
"""82012200-01""","""Pine Tree Lake""",2012,"""45.10231359""","""-92.95386928""",2.981818,0.020182
"""82011602-01""","""Armstrong Lake""",2009,"""44.96252306""","""-92.93917709""",1.114286,0.058929
"""82013700-01""","""Fish Lake""",2012,"""45.12242547""","""-92.97796745""",1.506333,0.108897


In [50]:
wq_summaries_complete.write_parquet('./data/water_quality_by_year.parquet')