# St. Clair County Land Use and Parcel Data Pipeline

In [1]:
!pip3 install Selenium
!pip3 install pandas
!pip3 install lxml
!pip3 install html5lib
!pip3 install findspark
!pip install whylogs whylogs[viz] whylogs[spark]



In [2]:
import csv
import json
import math
import os
import pprint
import re
import requests
import time
import queue

import findspark
findspark.init()
import pandas as pd
import pyspark
import pyspark.sql.functions as F

In [3]:
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from datetime import timedelta
from io import StringIO
from queue import Queue
from threading import Lock, RLock, Thread

from pyspark.sql.session import SparkSession
from pyspark.sql.dataframe import DataFrame
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.select import Select
from whylogs.api.pyspark.experimental import collect_column_profile_views
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
from whylogs.core.metrics.condition_count_metric import Condition
from whylogs.core.relations import Predicate
from whylogs.core.schema import DeclarativeSchema
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.specialized_resolvers import ConditionCountMetricSpec
from whylogs.core.constraints.factories import condition_meets
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values
from whylogs.core.constraints.factories import greater_than_number
from whylogs.viz import notebook_profile_viz

## Extraction

In [4]:
search_pg = "https://stclairil.devnetwedge.com/"
cwd = os.getcwd()

### Download St. Clair Co. Property Tax Inquiry Selected Townships Parcel Listing 

In [None]:
options = Options()
options.add_argument("--start-maximized")
options.add_argument("--headless=new")
prefs = {"download.default_directory": f"{cwd}"}
options.add_experimental_option("prefs", prefs)

In [None]:
driver = webdriver.Chrome(options)
driver.implicitly_wait(3)
driver.get(search_pg)

In [None]:
# Click into Advanced Search Tab
advance_search_tab = WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.XPATH, "//a[@href='#advanced-search']"))
)
advance_search_tab.click()

In [None]:
# Select Townships
township_select = Select(WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.ID, "advanced-search-townships"))
))
township_select.select_by_value("02")
township_select.select_by_value("11")
township_select.select_by_value("01")
township_select.select_by_value("06")

In [None]:
# Check All Years Box and Search
all_years_chkbx = WebDriverWait(driver, 10).until(
    EC.element_to_be_clickable((By.ID, "advanced-search-include-all-years"))
)
form = WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.XPATH, "//form"))
)
driver.execute_script(f"document.getElementById('advanced-search-include-all-years').click()")
all_years_chkbx.submit()

In [None]:
# Export Results to CSV and Download
export_btn = WebDriverWait(driver, 10).until(
    EC.element_to_be_clickable((By.XPATH, "//a[@href='/Search/ExportClientsListToCSV']"))
)
export_btn.click()

In [None]:
driver.quit()

### Scrape Parcel Information Tables

In [None]:
parcel_list_df = pd.read_csv("Exported_Search_Results.csv")

In [None]:
parcel_list_df.describe(include="all")

In [None]:
parcel_list_df.head()

In [None]:
# Format PropertyAccountNumber to be Solely Numeric + 'X'
def only_numeric(str):
    return "".join(re.findall(r"[\dX]", str))

parcel_list_df['Property Account Number'] = parcel_list_df['Property Account Number'].apply(only_numeric)
parcel_list_df.head()

In [None]:
with open("Exported_Search_Results.csv", newline="") as csvfile:
    reader = csv.reader(csvfile)
    parcel_list = list(reader)[2:]

url_pcs = [ (only_numeric(row[1]), row[0]) for row in parcel_list ]
display(len(url_pcs))
print(url_pcs[::1000])

In [None]:
def scrape_parcel_pg(listing_number, listing_year):
    parcel_url = f"{search_pg}parcel/view/{listing_number}/{listing_year}"
    parcel_pg = requests.get(parcel_url)
    parcel_pg = BeautifulSoup(parcel_pg.text, "html.parser")
    panel_divs = parcel_pg.find_all(class_="panel panel-info")
        
    tables_dict = {}
    
    for div in panel_divs:
        try:
            tbl_key = div.div.h3.text
            tbl = div.div.h3.parent.find_next_sibling().find("table").prettify()
            tables_dict[tbl_key] = pd.read_html(StringIO(tbl))[0]
        except:
            continue
    
    parcel_number = listing_number
    year = int(listing_year)

    try:
        # Property Information Table
        parcel_address = tables_dict['Property Information'][1][0].split("Site Address")[1].strip()
        sale_status = tables_dict["Property Information"][0][2].split("Sale Status")[1].strip()
        property_class = tables_dict["Property Information"][0][3].split("-")[0].split("Property Class")[1].strip()
        tax_status = tables_dict["Property Information"][2][3].split("Tax Status")[1].strip()
        net_taxable = tables_dict["Property Information"][0][4].split("Net Taxable Value")[1].strip()
        tax_rate = tables_dict["Property Information"][1][4].split("Tax Rate")[1].strip()
        total_tax = tables_dict["Property Information"][2][4].split("$")[1].strip()
        township = tables_dict["Property Information"][0][5].split("Township")[1].strip()
        acreage = tables_dict["Property Information"][1][5].split("Acres")[1].strip()
        
        # Assessments Table
        homesite_val = tables_dict["Assessments"].get("Homesite")[0]
        dwelling_val = tables_dict["Assessments"].get("Dwelling")[0]
        dept_rev_val = tables_dict["Assessments"].get("Total")[0]
    
        # Billing Table
        total_billed = tables_dict["Billing"].get("Totals")[4].strip("$")
        total_unpaid = tables_dict["Billing"].get("Totals")[6].strip("$")
    
        # Owner Information Table
        owner_name = tables_dict["Parcel Owner Information"].get("Name")[0]
        owner_address = tables_dict["Parcel Owner Information"].get("Address")[0]
    except Exception as err:
        time.sleep(1)
        return {
            "parcel": listing_number,
            "year": listing_year,
            "error": err
        }
    else:
        time.sleep(2)
        return {
            "parcel_number": parcel_number,
            "year": year,
            "parcel_address": parcel_address,
            "owner": owner_name,
            "owner_address": owner_address,
            "sale_status": sale_status,
            "property_class": property_class,
            "tax_status": tax_status,
            "net_taxable": net_taxable, 
            "tax_rate": tax_rate,
            "total_tax": total_tax,
            "township": township,
            "acreage": acreage,
            "homesite_val": homesite_val, 
            "dwelling_val": dwelling_val,
            "dept_rev_val": dept_rev_val,
            "total_billed": total_billed,
            "total_unpaid": total_unpaid
        }


In [None]:
def write_records():
    
    function_start = time.perf_counter()
    max_threads = 1000
    processed_ct = 0
    records_missed = []
    parcel_records = []
    success_headers = [
        "parcel_number", "year", "parcel_address", "owner", "owner_address", 
        "sale_status", "property_class", "tax_status", "net_taxable", 
        "tax_rate", "total_tax", "township", "acreage", "homesite_val", 
        "dwelling_val", "dept_rev_val", "total_billed", "total_unpaid"
    ]
    fail_headers = ["parcel", "year", "error"]
    record_q = Queue() 
    csv_rlock = RLock()
    flush_lock = Lock()

    
    def write_CSV(filename, headers, data, r_lock):
        with r_lock:
            if os.path.exists(filename):
                with open(filename, mode="a", newline="") as records_file:
                    records_writer = csv.writer(records_file, dialect="excel")
                    records_writer.writerows(data)
                    data.clear()
            else:
                with open(filename, mode="a", newline="") as records_file:
                    records_writer = csv.writer(records_file, dialect="excel")
                    records_writer.writerow(headers)
                    records_writer.writerows(data)
                    data.clear()
    
    def process_records_q(rec_queue, batch_start_t, r_lock):
        parcel_info = rec_queue.get()
        if len(parcel_info) == 18:
            parcel_records.append(list(parcel_info.values()))
            if len(parcel_records) == 500:
                write_CSV("parcel_records.csv", success_headers, parcel_records, r_lock)
                parcel_records.clear()
        if len(parcel_info) == 3:
            records_missed.append(list(parcel_info.values()))
            if len(records_missed) == 50:
                write_CSV("missed_records.csv", fail_headers, records_missed, r_lock)
                records_missed.clear()

    def flushing_writes(fxn_start_t, lock):
        write_CSV("parcel_records.csv", success_headers, parcel_records, lock)
        write_CSV("missed_records.csv", fail_headers, records_missed, lock)
        print(f"Writing parcel datums took: {timedelta(seconds=time.perf_counter()-fxn_start_t)} seconds.")
        records_df = pd.read_csv("parcel_records.csv")
        fail_df = pd.read_csv("missed_records.csv")
        display(records_df.describe(include="all"))
        display(fail_df.describe(include="all"))

    def q_flow():
        print("Running queue")
        while True:
            while record_q.empty() == False:
                process_records_q(record_q, batch_write_start, csv_rlock)
            else:
                time.sleep(3)
                continue

    
    q_thread = Thread(target=q_flow)

    url_list_length = len(url_pcs)
    decimals = [ i/10 for i in range(1, 11, 1) ]
    split_length = [ url_list_length*decimal for decimal in decimals ]

    with ThreadPoolExecutor(max_workers=max_threads) as p1:
        begin_at = 0
        for split in split_length:
            print("Proceeding...")
            futures = [ p1.submit(scrape_parcel_pg, row[0], row[1]) for row in url_pcs[begin_at:math.floor(split)] ]
            if q_thread.is_alive() == False:
                q_thread.start()
            elif q_thread.is_alive() == True:
                pass
            batch_write_start = time.perf_counter()
            for future in as_completed(futures):
                record_q.put(future.result())
                processed_ct += 1
                if processed_ct % 500 == 0:
                    print(f"Have processed {processed_ct} records in {timedelta(seconds=time.perf_counter()-batch_write_start)} seconds.")
                    batch_write_start = time.perf_counter()
            flushing_writes(function_start, flush_lock)
            begin_at = math.floor(split)
            time.sleep(30)
            print(f"Proceeding to next batch...setting at index {begin_at}..")
    print("Finished.")
    exit()
    
                


In [None]:
write_records()

In [5]:
records_df = pd.read_csv("parcel_records.csv")
display(records_df.dtypes)
display(records_df.sample(7))
display(records_df.describe(include="all"))


  records_df = pd.read_csv("parcel_records.csv")


parcel_number       int64
year                int64
parcel_address     object
owner              object
owner_address      object
sale_status        object
property_class      int64
tax_status         object
net_taxable        object
tax_rate          float64
total_tax          object
township           object
acreage           float64
homesite_val        int64
dwelling_val        int64
dept_rev_val        int64
total_billed       object
total_unpaid       object
dtype: object

Unnamed: 0,parcel_number,year,parcel_address,owner,owner_address,sale_status,property_class,tax_status,net_taxable,tax_rate,total_tax,township,acreage,homesite_val,dwelling_val,dept_rev_val,total_billed,total_unpaid
7717,2040309002,2022,"2884 N 45TH ST EAST SAINT LOUIS, IL 62201",MIGUEL LUGO-TRUJILLO,"2884 N 45TH ST E ST LOUIS, IL, 62201",,40,Taxable,20119,8.5159,1713.32,CANTEEN,0.14,1926,18193,20119,1713.32,0.0
59152,8220411022,2022,"614 E A ST BELLEVILLE, IL 62220",PAMELA & FIEBIG BRIAN ROUT,"614 E A ST BELLEVILLE, IL, 62220-3915",,40,Taxable,61538,9.5711,5889.86,BELLEVILLE,0.41,7305,65233,72538,5889.86,0.0
27903,2210200008,2022,"995 KINGSHIGHWAY EAST SAINT LOUIS, IL 62203",DORIS M NORTON,"2961 MULLIGAN LN BELLEVILLE, IL, 62220",,50,Taxable,712,17.7214,126.18,CANTEEN,0.18,712,0,712,126.18,0.0
42852,4300206022,2017,"OLD RR TRACK OFALLON, IL 62269",CITY OF OFALLON,"CITY CLERK 255 S LINCOLN AVE O FALLON, IL, 622...",,90,Exempt,0,0.0,0.0,OFALLON,10.31,4396,0,4396,0.0,0.0
60621,8250302012,2022,"4 PREMIER DR BELLEVILLE, IL 62221",4 PREMIER DR LLC,"4 PREMIER DR BELLEVILLE, IL, 62221",,60,Taxable,285213,10.9635,31269.34,BELLEVILLE,2.77,32248,252965,285213,31269.34,0.0
56209,8210401022,2022,"401 N 6TH ST BELLEVILLE, IL 62220",WILLIAM E & JEANETTE M HOERNIS,"405 N 6TH ST BELLEVILLE, IL, 62220-1133",,40,Taxable,19688,9.5711,1884.36,BELLEVILLE,0.17,3007,16681,19688,1884.36,0.0
1132,1130316055,2016,"822 BRADY AVE EAST SAINT LOUIS, IL 62201",MARY MONTGOMERY,"121 ST THOMAS LN E ST LOUIS, IL, 62206",,40,Taxable,0,0.0,0.0,EAST ST LOUIS,0.0,1527,1582,3109,0.0,0.0


Unnamed: 0,parcel_number,year,parcel_address,owner,owner_address,sale_status,property_class,tax_status,net_taxable,tax_rate,total_tax,township,acreage,homesite_val,dwelling_val,dept_rev_val,total_billed,total_unpaid
count,67349.0,67349.0,67348,67349,67349,5255,67349.0,67349,67349.0,67349.0,67349.0,67349,67349.0,67349.0,67349.0,67349.0,67349.0,67349.0
unique,,,52034,36468,36010,8,,4,29230.0,,34750.0,4,,,,,37909.0,30.0
top,,,"N 64TH ST EAST SAINT LOUIS, IL 62204",ST CLAIR COUNTY TRUSTEE,"10 PUBLIC SQ BELLEVILLE, IL, 62220",TRUSTSUB,,Taxable,0.0,,0.0,EAST ST LOUIS,,,,,0.0,0.0
freq,,,94,2435,3385,1984,,63169,19830.0,,19830.0,24722,,,,,19856.0,65508.0
mean,4275571000.0,2021.056096,,,,,85.111865,,,9.650228,,,0.925005,5601.121,22116.63,27824.17,,
std,2748777000.0,3.311405,,,,,488.895593,,,4.911541,,,8.628885,33666.0,72267.47,90567.39,,
min,1110200000.0,2004.0,,,,,0.0,,,0.0,,,0.0,0.0,0.0,0.0,,
25%,2150122000.0,2022.0,,,,,30.0,,,7.7937,,,0.08,467.0,0.0,619.0,,
50%,2270215000.0,2022.0,,,,,40.0,,,9.5711,,,0.16,1710.0,6745.0,8991.0,,
75%,8090303000.0,2022.0,,,,,40.0,,,13.8346,,,0.26,5531.0,28996.0,34604.0,,


## Data Cleaning with PySpark and whylogs

In [8]:
spark = SparkSession.builder \
    .appName("stcc_ppln") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.sql.execution.arrow.pyspark.enabled","true") \
    .config("spark.sql.caseSensitive", True) \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org.apache.spark.util.ShutdownHookManager").setLevel(logger.Level.OFF)
logger.LogManager.getLogger("org.apache.spark.SparkEnv").setLevel(logger.Level.ERROR)

### Data Profiling