In [1]:
import pandas as pd
from config import PATH_URLS, N_URLS_TO_PROCESS

# change this number to set the number of URLs to process.
# Running the whole dataset can be somewhat slow (~10 minutes) and memory-intensive (~3 GB)
# since the processing, especially the parsing, is not really optimized for performance
N_URLS_TO_PROCESS = -1

URLS = pd.read_csv(PATH_URLS).head(N_URLS_TO_PROCESS)
URLS.head(10)

Unnamed: 0,pws_id,pws_url_id,url
0,CA0103040,3,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
1,CA0103041,4,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
2,CA0105002,15,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
3,CA0105003,16,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
4,CA0105008,18,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
5,CA0105009,19,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
6,CA0105010,20,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
7,CA0105012,22,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
8,CA0105013,23,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...
9,CA0105016,26,https://sdwis.waterboards.ca.gov/PDWW/JSP/Wate...


# Fetching the HTML pages

To get the data from the server, we need to issue a request for each WSD URL. The number is not enormous but at the same time non-negligible. since, in general, we want to throttle (limit the rate) our requests since we don't know if this will trigger some security response from the server, e.g. refusing connections. 

In addition, it's useful (especially when scraping) to be able to cache the results, so that we fetch the data once and can process it without limitations afterwards.

In [2]:
from config import PATH_REQUESTS_CACHE_DB

PATH_REQUESTS_CACHE_DB

PosixPath('/data/datasets/parse-pws')

In [3]:
import requests_cache
import time

def _make_throttle_hook(timeout=1):
    """
    Returns a response hook function which sleeps for `timeout` seconds if
    response is not coming from the cache.

    From https://requests-cache.readthedocs.io/en/latest/user_guide.html#usage
    """
    def hook(response, *args, **kwargs):
        if not getattr(response, 'from_cache', False):
            # TODO use logging instead of print
            print(f'{response} not found in cache. Timeout for {timeout:.2f} s.')
            time.sleep(timeout)
        return response
    return hook


def get_session(rate_max=.5, timeout=None, path_cache_db=''):
    """
    `rate_max` is in requests per second; `timeout` is in seconds.
    """
    timeout = timeout or 1 / rate_max
    
    # we create the cache backend object explicitly,
    # so that it's not overwritten every time we call this function
    # (which happens when re-running the notebook)
    from requests_cache.backends.sqlite import DbCache
    
    cache_db = DbCache(location=str(path_cache_db))

    s = requests_cache.CachedSession(backend=cache_db)
    s.hooks = {'response': _make_throttle_hook(timeout)}
    return s

In [4]:
CACHED_SESSION = get_session(timeout=.2, path_cache_db=PATH_REQUESTS_CACHE_DB)
CACHED_SESSION

<CachedSession(DbCache('cache', ...), expire_after=None, allowable_methods=('GET',))>

In [5]:
from tqdm._tqdm_notebook import tqdm_notebook

In [6]:
def fetch_urls(_df_urls, session):
    df = _df_urls.reset_index()

    with tqdm_notebook(total=len(df)) as progress_bar:
        for row in df.itertuples():
            progress_bar.set_postfix({'fetching PWSID': row.pws_id})
            progress_bar.update(1)
            
            resp = session.get(row.url)

In [7]:
fetch_urls(URLS, CACHED_SESSION)

HBox(children=(IntProgress(value=0, max=8327), HTML(value='')))




# Utility classes for parsing

First, some generic base classes:

In [8]:
from requests_html import HTMLSession, HTML

In [9]:
class DocumentParser:
    """Helper class to process a single page/HTML document"""
        
    def __init__(self, html=None, **kwargs):
        self.html = html
        
    def to_record(self):
        return {}
    
    @staticmethod
    def get_fallback_html():
        return HTML(html='<body></body>')
    
    @classmethod
    def from_url(cls, url, session=None, **kwargs):
        session = session or HTMLSession()
        
        resp = session.get(url)
        if resp:
            if isinstance(session, HTMLSession):
                html = resp.html
            else:
                html = HTML(url=url, html=resp.text)
        else:
            html = cls.get_fallback_html()

        return cls(html=html, url=url, **kwargs)
    
    @classmethod
    def from_path(cls, path, **kwargs):
        html = HTML(html=Path(path).read_text())
        return cls(html=html, **kwargs)
    
    def to_path(self, path):
        path = Path(path)
        path.save_text(self.html.html)
        return path

In [10]:
class ComponentParser:
    """Helper class for parsing a (generalized) single component"""
    fallback = dict

    def __init__(self, element=None, html_raw=None, process=None, **kwargs):

        self.element = element
        self.html_raw = html_raw or self.element.html
        
        self.data = self.fallback()
        self.process = process or (lambda d: d)
        
        try:
            data = self.parse(**kwargs)
            data = self.process(data)
            self.data = data
        except Exception:
            pass

    def parse(self, **kwargs):
        return self.data
        
    def to_record(self):
        return dict(self.data)

Let's extend these with more specific subclasses for parsing WSD pages and the tables that appear there:

In [11]:
def extract_keyval_table(element, key_pattern=':'):
    keys = []
    values = []
    
    for table_row in element.find('td'):
        text = table_row.text
        if key_pattern in text:
            keys.append(text.replace(key_pattern, '').strip())
        else:
            values.append(text.strip())
    
    return dict(zip(keys, values))

class WSDetailsTable(ComponentParser):
    """Parses the table at the top op the page, which require a special treatment since it's not a well-formed HTML table"""
    fallback = dict
    
    def parse(self):
        return extract_keyval_table(self.element)
    
    def to_record(self):
        return dict(self.data)

class WSTable(ComponentParser):
    fallback = pd.DataFrame
    
    def parse(self):
        return pd.read_html(self.html_raw)[0]
    
    def to_record(self):

        # return self.data.to_dict(orient='records')
        # the DataFrame.to_dict(orient='records') mangles column names
        # e.g. "Type  Code" (with two spaces) shows up as "_1"
        # the same happens using df.itertuples()
        # it might be a pandas bug
        def to_dict_records_alt(d):
            # transform nans to None for greater compatibility
            # https://stackoverflow.com/a/39279898/
            return [dict(row) for i, row in d.where(d.notnull(), None).iterrows()]
        
        return to_dict_records_alt(self.data)

In [12]:
class PWSDetailsPage(DocumentParser):
    
    def __init__(self, url='', pws_id=None, **kwargs):
        super().__init__(**kwargs)

        self.url = url

#         if self.url:
#             self.parse_url(self.url)
#         else:
        self.pws_id = pws_id
        
    def parse_url(self, url):
        params = get_params_from_url(url)
        
        self.pws_id = params.get('wsnumber', '')
        self.pws_url_id = params.get('tinwsys_is_number', '')
        
    @property
    def table_details(self):
        sel = 'table[summary="Water System  Details"]'
        elem = self.html.find(sel, first=True)
        return WSDetailsTable(elem)
    
    @property
    def table_water_sources(self):
        sel = 'table[summary="Details about Sources of Water"]'
        # there are two elements with identical attributes (elem.attrs):
        # this ("Sources of water") is the first, "Water Purchases" is the second
        elem = self.html.find(sel, first=True)

        def rename_cols(d):
            # get rid of extra space
            return d.rename(columns={'Type  Code': 'Type Code'})

        return WSTable(elem, process=rename_cols)
    
    @property
    def table_water_purchases(self):
        sel = 'table[summary="Details about Sources of Water"]'
        elem = self.html.find(sel)[1]
        
        return WSTable(elem)
    
    @property
    def table_ws_contacts(self):
        pass
    
    @property
    def table_service_areas(self):
        sel = 'table[summary="Summary of Service Area"]'
        return WSTable(self.html.find(sel, first=True))
    
    @property
    def table_service_connections(self):
        sel = 'table[summary="Summary of Service Connection"]'
        return WSTable(self.html.find(sel, first=True))
    
    @property
    def urls_other(self):
        def is_interesting(url):
            # exclude:
            return all([
                # internal framework links
                'jsp' not in url,
                # google maps (might be useful for the address)
                'maps.google' not in url,
                # EAR links
                'drinc.ca.gov/ear/' not in url
            ])

        return [url for url in self.html.links if is_interesting(url)]
    
    def to_record(self):
        d = {'pws_id': self.pws_id}

        d['water_system_details'] = self.table_details.to_record()

        d['water_sources'] = self.table_water_sources.to_record()        
        d['water_purchases'] = self.table_water_purchases.to_record()
        d['service_areas'] = self.table_service_areas.to_record()
        d['service_connections'] = self.table_service_connections.to_record()
        
        d['urls_other'] = self.urls_other
        
        return d

Then, some utility functions to regularize field/column names and solidify the final data tables:

In [13]:
import re

def to_snakecase(s):
    import re
    
    # convert one or more spaces to underscore
    s = re.sub('\s+', '_', s.strip())
    # strip all other non-alphanumeric chars
    return (re.sub('\W', '', s)
            .lower()
           )

def normalize_column_names(df):
    return df.rename(columns=to_snakecase)

def drop_columns_robust(df, cols):
    to_drop = df.columns & set(cols)
    
    return df.drop(columns=to_drop)

def to_category(df, cols):
    return df.astype({col: 'category' for col in cols})

And, finally, a container class to interact with the whole dataset (or, a specified portion of it)

In [14]:
class WSDDataset:
    
    @classmethod
    def from_df_urls(cls, df_urls, **kwargs):
        return cls([PWSDetailsPage.from_url(**row, **kwargs) for _, row in df_urls.iterrows()])
    
    def __init__(self, items=None, records=None):
        self._items = items or []
        self._records = records or self.to_record()
        
    def __repr__(self):
        return f'{type(self).__name__}(len={len(self)})'
        
    def __iter__(self):
        return iter(self._items)
    
    def __len__(self):
        return len(self._items or self._records)
    
    def to_record(self):
        return [item.to_record() for item in self]
    
    @property
    def records(self):
        return self._records
    
    def _get_table_from_record_dict(self, field_name):
        return pd.DataFrame([dict(pws_id=r['pws_id'], **r[field_name]) for r in self.records])

    def _get_table_from_record_list(self, field_name, as_index=None):
        df = (pd.DataFrame([dict(pws_id=r['pws_id'], entry_id=entry_id, **field_data)
                         for r in self.records
                         for entry_id, field_data in enumerate(r[field_name])])
          )

        if as_index:
            df = df.set_index(as_index)
        return df
    
    def get_table_details(self):
        nuisance_cols = ['PG&E PHILBROOK DAM']

        return (self._get_table_from_record_dict('water_system_details')
                .set_index('pws_id')
                .pipe(drop_columns_robust, nuisance_cols)
                .pipe(normalize_column_names)
                .pipe(to_category, ['federal_type', 'primary_source', 'state_type', 'status'])
                .assign(activity_date=lambda d: pd.to_datetime(d['activity_date']))
               )
    
    def get_table_service_connections(self):
        return (self._get_table_from_record_list('service_connections', as_index=['pws_id', 'entry_id'])
                .pipe(normalize_column_names)
                .pipe(to_category, ['meter_type', 'type'])
               )
    
    def get_table_service_areas(self):
        return (self._get_table_from_record_list('service_areas', as_index=['pws_id', 'entry_id'])
                .pipe(normalize_column_names)
                .pipe(to_category, ['code', 'name'])
               )
    
    def get_table_water_sources(self):
        return (self._get_table_from_record_list('water_sources', as_index=['pws_id', 'entry_id'])
                .pipe(normalize_column_names)
                .pipe(to_category, ['status', 'type_code'])
               )
    
    def get_table_water_purchases(self):
        return (self._get_table_from_record_list('water_purchases', as_index=['pws_id', 'entry_id'])
                .pipe(normalize_column_names)
                .pipe(to_category, ['buyer_facility_type', 'seller_facility_type'])
               )
    
    def to_csv(self, path):

        names_functions = [
            ('details', self.get_table_details),
            ('service-connections', self.get_table_service_connections),
            ('service-areas', self.get_table_service_areas),
            ('water-sources', self.get_table_water_sources),
            ('water-purchases', self.get_table_water_purchases),
        ]
        
        for name, f in names_functions:
            path_file = (path / name).with_suffix('.csv')
            df = f()
            df.to_csv(path_file)
            
    def to_json(self, path):
        import json

        with (path / 'records.json').open('w') as f:
            json.dump(d.records, f, indent=4)

In [15]:
import numpy as np

s = pd.Series([1, 2, 3, np.nan])
dict(s.where(s.notnull(), None))

{0: 1.0, 1: 2.0, 2: 3.0, 3: None}

In [16]:
d = WSDDataset.from_df_urls(URLS, session=CACHED_SESSION)
d

WSDDataset(len=8327)

In [17]:
(d.get_table_details()
)

Unnamed: 0_level_0,1111,activity_date,coachella_vwd_cove_community,coachella_vwd_id_no_11,coachella_vwd_id_no_8,don_pedro_rec_agblue_oaks,don_pedro_rec_agflemmeadows,federal_type,napa_county_schools_pope_valley,nvusd_carneros_school,nvusd_mt_george_school,primary_source,principal_county_served,shoreline_school_dist_bus_garage,state_type,status,water_system_name,water_system_no
pws_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
CA0103040,,1983-01-27,,,,,,C,,,,GW,ALAMEDA,,C,A,NORRIS CANYON PROPERTY OWNERS ASSN,CA0103040
CA0103041,,1983-01-27,,,,,,C,,,,SWP,ALAMEDA,,C,A,TRAILER HAVEN MOBILE HOME PARK,CA0103041
CA0105002,,1981-11-30,,,,,,NC,,,,GW,ALAMEDA,,NC,A,RIVERS END MARINA,CA0105002
CA0105003,,1981-11-30,,,,,,NTNC,,,,GW,ALAMEDA,,NTNC,A,CEMEX/ELIOT PLANT,CA0105003
CA0105008,,1981-11-30,,,,,,C,,,,GWP,ALAMEDA,,C,A,CASTLEWOOD DOMESTIC WATER SYSTEM,CA0105008
CA0105009,,2017-10-24,,,,,,NTNC,,,,GWP,ALAMEDA,,NTNC,A,MOUNTAIN HOUSE SCHOOL,CA0105009
CA0105010,,1984-01-21,,,,,,NC,,,,SW,ALAMEDA,,NC,A,EBRPD - DEL VALLE REGIONAL PARK,CA0105010
CA0105012,,1993-07-01,,,,,,NC,,,,GW,ALAMEDA,,NC,A,EBRPD - SUNOL REGIONAL WILDERNESS,CA0105012
CA0105013,,1993-07-01,,,,,,NC,,,,GW,ALAMEDA,,NC,A,EBRPD - REDWOOD SPRING REGIONAL PARK,CA0105013
CA0105016,,2013-08-16,,,,,,NC,,,,GW,ALAMEDA,,NC,A,MOUNTAIN HOUSE BAR,CA0105016


In [18]:
(d.get_table_water_sources()
)

Unnamed: 0_level_0,Unnamed: 1_level_0,name,status,type_code
pws_id,entry_id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
CA0103040,0,CISTERN SPRING,A,SP
CA0103040,1,LOWER SPRING,A,SP
CA0103040,2,UPPER SPRING,A,SP
CA0103041,0,EBMUD CONNECTION,A,CC
CA0103041,1,WELL 01,A,WL
CA0105002,0,WELL 01,A,WL
CA0105003,0,WELL 2,A,WL
CA0105003,1,ELIOT WELL,I,WL
CA0105008,0,SFPUC PLEASANTON WELLS,A,CC
CA0105008,1,WELL 03 - INACTIVE,I,WL


In [19]:
(d.get_table_service_connections()
 .sort_values('count', ascending=False)
)

Unnamed: 0_level_0,Unnamed: 1_level_0,count,meter_size_measure,meter_type,type
pws_id,entry_id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
CA1910067,3,609694,0,ME,RS
CA0110005,0,387846,0,ME,CB
CA3710020,3,225109,0,ME,RS
CA4310011,2,204733,0,ME,RS
CA3810011,0,170509,0,ME,CB
CA3410020,0,157312,0,UN,CB
CA3310009,0,146850,0,ME,CB
CA1010007,0,132981,0,ME,CB
CA3310001,0,104188,0,ME,CB
CA3010092,4,98901,0,ME,RS


In [20]:
d.get_table_service_areas()

Unnamed: 0_level_0,Unnamed: 1_level_0,code,name
pws_id,entry_id,Unnamed: 2_level_1,Unnamed: 3_level_1
CA0103040,0,R,RESIDENTIAL AREA
CA0103041,0,R,MOBILE HOME PARK
CA0105002,0,T,RECREATION AREA
CA0105003,0,NT,INDUSTRIAL/AGRICULTURAL
CA0105008,0,R,RESIDENTIAL AREA
CA0105009,0,NT,SCHOOL
CA0105010,0,T,RECREATION AREA
CA0105012,0,T,RECREATION AREA
CA0105013,0,T,RECREATION AREA
CA0105016,0,T,RESTAURANT


In [21]:
(d.get_table_water_purchases()
 .reset_index()
 .groupby('seller_water_system_no')
 .agg({'water_system_name': 'first', 'pws_id': 'count'})
 .rename(columns={'pws_id': 'buyer_count'})
 .sort_values('buyer_count', ascending=False)
)

Unnamed: 0_level_0,water_system_name,buyer_count
seller_water_system_no,Unnamed: 1_level_1,Unnamed: 2_level_1
CA1910087,METROPOLITAN WATER DIST. OF SO. CAL.,345
CA3710042,SAN DIEGO COUNTY WATER AUTHORITY,77
CA3810001,SAN FRANCISCO REGIONAL WATER SYSTEM,62
CA4310027,SANTA CLARA VALLEY WATER DISTRICT,31
CA1910045,ANTELOPE VALLEY-EAST KERN WATER AGENCY,29
CA3710020,"SAN DIEGO, CITY OF",27
CA1910199,CALIFORNIA DOMESTIC WATER COMPANY,25
CA3310009,EASTERN MUNICIPAL WD,22
CA5610050,CALLEGUAS MUNICIPAL WATER DIST,21
CA4310011,SAN JOSE WATER,20


In [22]:
from config import PATH_DATA

PATH_DATA

PosixPath('data-out')

In [23]:
d.to_csv(PATH_DATA)

In [24]:
d.to_json(PATH_DATA)