In [None]:
import pandas as pd
import geopandas as gpd
from sqlalchemy import create_engine
import numpy as np
import dedupe
import re
from dotenv import load_dotenv
import os

In [None]:
load_dotenv()
PG_CONNECT = os.getenv("PG_CONNECT")

In [None]:
# Residential Land Use Codes from MA Dept of Revenue
# https://www.mass.gov/files/documents/2016/08/wr/classificationcodebook.pdf
# Codes are 101*-109*, 031*, and 013*
# Often include suffixes (letters, zeroes or no character), thus regex *?
USE_CODES = '^1[0-1][1-9]*?|^013*?|^031*?'
BOS_CODES = '^R[1-4]$|^RC$|^RL$|^CD$|^A$'

medparse = lambda x: pd.datetime.strptime(x, '%Y%m%d')

def read_res(file_dict):
    df = pd.DataFrame()
    for town, file, in file_dict.items():
        town_df = gpd.read_file(file).drop('geometry', axis='columns')
        town_df['town'] = town
        df = df.append(town_df, ignore_index=True)
    return df

In [None]:
# Data from MassGIS Standardized Assessor's Parcels
# https://docs.digital.mass.gov/dataset/massgis-data-standardized-assessors-parcels
# Medford, Cambridge, and Somerville all last updated FY 2019

In [None]:
# Medford processing.
files = {'med': './data/assess/med_assess.dbf'}
med = read_res(files)
# Rename column to lower-case.
med.columns = med.columns.str.lower()
# Filter for residential paWorldrcels.
med = med[med['use_code'].str.contains(USE_CODES, regex=True)]
# Identify rows with co-owner names erroneously listed in address column.
mask = med.own_addr.str.contains(pat = '|'.join(['^C/O', '^[A-Za-z]']), na=False) & ~med.own_addr.str.contains(pat = '|'.join(['^PO', '^P.O.', '^P. O.', '^P O ', '^ONE', '^BOX', '^ZERO']), na=False)
# Add co-owners identified to co column.
med['co'] = med.own_addr[mask]
med.loc[~mask, 'co'] = None
# Fill own_addr with none for above-identified rows.
med.loc[mask, 'own_addr'] = None
# Remame columns
med = med.rename(columns = {
    'prop_id': 'gisid',
    'owner1': 'own',
    'site_addr': 'prop_addr',
    'total_val': 'ass_val',
    'location': 'unit',
    'ls_date': 'sale_d',
    'ls_price': 'sale_p'
})
med.loc[:,'sale_d'] = pd.to_datetime(med['sale_d'], format='%Y%m%d')
med.loc[:,'prop_addr'] = med.prop_addr.str.strip()
# Replace underscores with hyphens.
med.loc[:,'gisid'] = med.gisid.str.replace(r'_', '-', regex=True)
# Concatenate address.
med.loc[:,'own_addr'] = [', '.join((str(a), str(b), str(c))) for a, b, c in zip(med['own_addr'], med['own_city'], med['own_state'])]
med.loc[:,'own_addr'] = [' '.join((str(a), str(b))) for a, b in zip(med['own_addr'], med['own_zip'])]
# Remove concatenated Nones.
med = med.replace({r'None, ': ''}, regex=True)
med['year'] = 'FY2019'
med.loc[:,'sale_p'] = med['sale_p'].replace(0, None)
# Filter columns.
med = med[['gisid', 'town', 'prop_addr', 'unit', 'own', 'co', 'own_addr', 'ass_val', 'year', 'sale_d', 'sale_p']]

In [None]:
# Somerville processing.
som = pd.read_csv('./data/assess/som_assess_FY14-FY19.csv',
                  dtype={'HOUSE NO': str}) 
som.columns = som.columns.str.lower()

# Filter for residential parcels.
som = som[som['pcc'].str.contains(USE_CODES, regex=True)]

som.loc[:,'prop_addr'] = [' '.join((str(a), str(b))) for a, b in zip(som['house no'], som['street'])]

som = som.replace({r'^, ': '', r' ,': '', r', nan': '', r'nan': '', r'None, ': '', r', None': ''}, regex=True)
som = som.replace({' ': None, '': None, np.nan: None})

# Pad ZIP code with zeroes, remove 4-digit suffix.
# Assessor appears to have overzealously corrected...
som.loc[:,'owner zip'] = som['owner zip'].str[1:]
som.loc[:,'own_addr'] = [', '.join((str(a), str(b), str(c), str(d))) for a, b, c, d in zip(som['owner add'], som['owner city'], som['owner state'], som['owner zip'])]

som.loc[:,'gisid'] = ['-'.join((str(m), str(b), str(l))) for m, b, l in zip(som['map'], som['block'], som['lot'])]
som['town'] = 'som'
som = som.drop(['year'], axis=1)
som = som.rename(columns = {
    'commitment owner': 'own',
    'current co-owner': 'co',
    'parcel val': 'ass_val',
    'fiscal_year': 'year'
})

# Assessor seems to have screwed up this column in the 2014-2019 data
# but it appears that 2019 data is incrementally numbered (¯\_(ツ)_/¯)
som = som.loc[som['year'] >= 2019]
som['year'] = 'FY2019'
# Filter columns.
som = som[['gisid', 'town', 'prop_addr', 'unit', 'own', 'co', 'own_addr', 'ass_val', 'year']]

In [None]:
# Somerville 2019 assessor's table doesn't include sale date 
# (apparently by accident), so we collate with MassGIS source.
files = {'som': './data/assess/som_massgis.dbf'}
som_mg = read_res(files)
# Rename column to lower-case.
som_mg.columns = som_mg.columns.str.lower()
# Remame columns
som_mg = som_mg.rename(columns = {
    'prop_id': 'gisid',
    'ls_date': 'sale_d',
    'ls_price': 'sale_p'
})
som_mg.loc[:,'sale_d'] = pd.to_datetime(som_mg['sale_d'], format='%Y%m%d')
# Replace underscores with hyphens.
som_mg.loc[:,'gisid'] = som_mg.gisid.str.replace(r'_', '-', regex=True)
som_mg.loc[:,'sale_p'] = som_mg['sale_p'].replace(0, None)
# Filter columns.
som_mg = som_mg[['gisid', 'sale_d', 'sale_p']]

In [None]:
som = som.merge(som_mg[['gisid', 'sale_d', 'sale_p']], how='left', on='gisid')

In [None]:
bos = pd.read_csv('./data/assess/bos_assess.csv', dtype={'GIS_ID': str, 'MAIL_ZIPCODE': str, 'U_TOT_RMS': str})
bos.columns = bos.columns.str.lower()
bos = bos.rename(columns = {
    'gis_id': 'gisid',
    'owner': 'own',
    'mail_addressee': 'co',
    'unit_num': 'unit',
    'av_total': 'ass_val'
})
bos['town'] = 'bos'
# Filter by residential property types.
bos = bos[bos['lu'].str.contains(BOS_CODES, regex=True)]
bos.loc[:, 'gisid'] = bos.gisid.str.strip().str.pad(width=10, side='left', fillchar='0')
# Pad ZIP code with zeroes, remove 4-digit suffix.
bos.loc[:,'mail_zipcode'] = bos.mail_zipcode.astype(str).str.strip().str.pad(width=5, side='left', fillchar='0')
# Add comma between city and state.
bos.loc[:,'mail cs'] = bos['mail cs'].str.rsplit(' ', 1).apply(lambda x: ', '.join(x))
# Concatenate property address components
bos.loc[:,'prop_addr'] = [' '.join((str(a), str(b), str(c))) for a, b, c in zip(bos['st_num'], bos['st_name'], bos['st_name_suf'])]
bos.loc[:,'prop_addr'] = bos.prop_addr.str.strip()
# Concatenate owner address components.
bos.loc[:,'own_addr'] = [', '.join((str(a), str(b))) for a, b in zip(bos['mail_address'], bos['mail cs'])]
bos.loc[:,'own_addr'] = [' '.join((str(a), str(b))) for a, b in zip(bos['own_addr'], bos['mail_zipcode'])]
bos.loc[:,'own_addr'] = bos.own_addr.str.strip()
# Filter columns
bos['year'] = 'FY2020'
bos = bos[['gisid', 'town', 'prop_addr', 'unit', 'own', 'co', 'own_addr', 'ass_val', 'year']]
# Replace blank strings with None (necessary for dedupe).
bos = bos.replace({' ': None, '': None, r' #nan': None})
bos = bos.replace({r' #nan': ''}, regex=True)

In [None]:
cam = pd.read_csv('./data/assess/cam_assess.csv',
                  parse_dates=['SaleDate'],
                  dtype={'Owner_Zip': str, 
                         'SalePrice': float,
                         'StateClassCode': str
                        })
# rename all columns to lowercase
cam.columns = cam.columns.str.lower()
# Filter for residential properties.
cam = cam[cam['stateclasscode'].str.contains(USE_CODES, regex=True)]
# Pad zip to five digits and remove 4-digit zip suffix.
cam.loc[:,'owner_zip'] = cam['owner_zip'].str.rsplit('-', 1).str[0]
# Identify rows with co-owner names erroneously listed in address column.
mask = cam.owner_address.str.contains(pat = '|'.join(['^C/O', '^ATTN:']), na=False)
cam.loc[mask, 'owner_address'] = None
# Add co-owners identified to co column.
cam.loc[mask, 'owner_coownername'] = [', '.join((str(a), str(b)))  for a, b in zip(cam.loc[mask, 'owner_coownername'], cam.loc[mask, 'owner_address'])]
# Concatenate owner address components
cam.loc[:,'own_addr'] = [', '.join((str(a), str(b), str(c), str(d))) for a, b, c, d in zip(cam['owner_address'], cam['owner_address2'], cam['owner_city'], cam['owner_state'])]
cam.loc[:,'own_addr'] = [' '.join((str(a), str(b))) for a, b in zip(cam['own_addr'], cam['owner_zip'])]
cam.loc[:,'own_addr'] = cam.own_addr.str.strip()
# Clean property address column
cam['prop_addr'] = cam['address'].str.rsplit('\nCambridge, MA', 1).apply(lambda x: x[0].replace('\n', ' ').strip())
# Bring property address in line with others.
cam['town'] = 'cam' 
cam = cam.rename(columns = {
    'owner_name': 'own',
    'owner_coownername': 'co',
    'assessedvalue': 'ass_val',
    'saleprice': 'sale_p',
    'saledate': 'sale_d'
})
cam['year'] = 'FY2020'
cam['sale_p'].values[cam['sale_p'].values < 1] = None
cam = cam.replace({r'^, ': '', r' ,': '', r', nan': '', r'None, ': '', r', None': ''}, regex=True)
cam = cam.replace({' ': None, '': None, np.nan: None})
cam = cam[['gisid', 'town', 'prop_addr', 'unit', 'own', 'co', 'own_addr', 'ass_val', 'year', 'sale_d', 'sale_p']]

In [None]:
brook = pd.read_csv('./data/assess/brook_assess.csv', 
                    dtype={'SALEPRICE': float,
                          'USECD': str},
                    parse_dates=['SALEDATE'])
brook.columns = brook.columns.str.lower()
brook = brook[brook['usecd'].str.contains(USE_CODES, regex=True)]

brook.loc[:,'zip'] = brook['zip'].str.rsplit('-', 1).str[0]
# Name town.
brook['town'] = 'brk' 
# Concatenate address.
brook.loc[:,'own_addr'] = [', '.join((str(a), str(b), str(c))) for a, b, c, in zip(brook['address'], brook['city'], brook['state'])]
# Append zip to address with no comma.
brook.loc[:,'own_addr'] = [' '.join((str(a), str(b))) for a, b in zip(brook['own_addr'], brook['zip'])]
brook.loc[:,'own_addr'] = brook.own_addr.str.strip()
# Concatenate property address components
brook.loc[:,'prop_addr'] = [''.join((str(a), str(b))) for a, b in zip(brook['addno1'], brook['addno2'])]
brook.loc[:,'prop_addr'] = [' '.join((str(a), str(b))) for a, b in zip(brook['prop_addr'], brook['addst1'])]
brook.loc[:,'prop_addr'] = brook.prop_addr.str.strip()
# Append 
brook.loc[:,'own'] = [' '.join((str (a), str(b))) for a, b in zip(brook['firstname1'], brook['lastname1'])]
brook.loc[:,'co'] = [' '.join((str(a), str(b))) for a, b in zip(brook['firstname2'], brook['lastname2'])]
brook = brook.replace({' ': None, '': None})
brook = brook.rename(columns = {
    'parcel-id': 'gisid',
    'addst2': 'unit',
    'restotlval': 'ass_val',
    'saleprice': 'sale_p',
    'saledate': 'sale_d'
})
brook = brook.replace({r'^, ': '', r' ,': '', r', nan': '', r'nan': '', r'None, ': '', r', None': ''}, regex=True)
brook = brook.replace({' ': None, '': None})
brook['sale_p'].values[brook['sale_p'].values < 1] = None
brook['year'] = 'FY2020'
brook = brook[['gisid', 'town', 'prop_addr', 'unit', 'own', 'co', 'own_addr', 'ass_val', 'year', 'sale_d', 'sale_p']]

In [None]:
all_assess = pd.concat([som, med, cam, bos, brook], ignore_index=True)

In [None]:
all_assess.loc[:,'prop_addr'] = all_assess.prop_addr.str.lstrip('0').str.strip()
all_assess.loc[:,'own_addr'] = all_assess.own_addr.str.lstrip('0').str.strip()
all_assess.loc[:,'co'] = all_assess['co'].replace({r'C/O ': '', r'S/O ': '', r'ATTN: ': '', r'ATTN ': ''}, regex=True)
all_assess = all_assess.replace({r'None': '', 'nan': ''}, regex=True)
all_assess = all_assess.replace({' ': None, '': None})
all_assess = all_assess[~pd.isnull(all_assess['gisid'])]
all_assess = all_assess.replace({pd.np.nan: None})

In [None]:
def tupleize(row):
    if (row['co'] is not None) & (row['own'] is not None):
        return tuple([row['own'], row['co']])
    elif (row['co'] is None) & (row['own'] is not None):
        return tuple([row['own']])
    else:
        return None
    

all_assess['owners'] = all_assess.apply(tupleize, axis=1)

In [None]:
# Convert to dictionary (expected by Dedupe)
all_assess_dict = all_assess[['owners','own_addr']].to_dict('index')

In [None]:
settings_file = './training/learned_settings'
training_file = './training/training.json'

# If settings exist, read from existing.
if os.path.exists(settings_file):
    print('Reading learned settings from', settings_file)
    with open(settings_file, 'rb') as f:
        deduper = dedupe.StaticDedupe(f)
else:
    # Tell Dedupe which fields are used to identify duplicates.
    fields = [
        {'field': 'owners', 'variable name': 'owners', 'type': 'Set'},
        {'field': 'own_addr', 'variable name': 'own_addr', 'type': 'Address'}
        ]
    deduper = dedupe.Dedupe(fields)
    # If training file exists, read it...
    if os.path.exists(training_file):
        print('reading labeled examples from ', training_file)
        with open(training_file, 'rb') as f:
            deduper.prepare_training(all_assess_dict, f)
    # Otherwise, prepare a training set...
    else:
        deduper.prepare_training(all_assess_dict)
    # Start supervised labeling.
    dedupe.console_label(deduper)
    deduper.train()
    # Write settings and training sets.
    with open(training_file, 'w') as tf:
        deduper.write_training(tf)
    with open(settings_file, 'wb') as sf:
        deduper.write_settings(sf)

In [None]:
# Identify clusters based on training dataset.
# Higher threshold is less tolerant of differences between names/addresses.
clustered_dupes = deduper.partition(all_assess_dict, threshold = 0.5)

# How many sets are there?
print('Number of sets', len(clustered_dupes))

In [None]:
# Create empty arrays to hold results.
rid = []
clst = []
conf = []
count = []

# Iterate over results...
for cluster_id, (records, scores) in enumerate(clustered_dupes):
    for record_id, score in zip(records, scores):
        # Append record id
        # Corresponds to index of assessor dataframe.
        rid.append(record_id)
        # Append cluster ID.
        clst.append(cluster_id)
        # Append confidence score.
        conf.append(score)

# Build new dataframe using result arrays.
clust = pd.DataFrame(list(zip(clst, conf)), 
                  columns =['clst', 'conf'],
                  index = rid
                 )

In [None]:
# Join clusters to assessors dataframe.
all_assess = all_assess.join(clust)

In [None]:
# Read spatial data
parcels_gdf = gpd.read_file('./data/parcels/mamas_parcels.shp')
parcels_gdf = parcels_gdf.rename(columns = {
    'pid': 'gisid'
}).drop_duplicates(subset=['gisid', 'town'])
parcels_gdf = parcels_gdf[~pd.isnull(parcels_gdf['gisid'])]
parcels_gdf = parcels_gdf[~pd.isnull(parcels_gdf['geometry'])]
# parcels_gdf.loc[:,'geometry'] = parcels_gdf.geometry.centroid
centroid = parcels_gdf.geometry.centroid
parcels_gdf.loc[:,'lat'] = centroid.y
parcels_gdf.loc[:,'lon'] = centroid.x

In [None]:
all_assess = parcels_gdf.merge(all_assess, on=['town', 'gisid'], how='right')
all_assess = all_assess[~np.isnan(all_assess.lat)]

In [None]:
# Hard-coding this count logic saves a ton of time for each PostgreSQL query.
all_assess = all_assess.merge(all_assess.groupby('clst').count()[['gisid']].rename(columns={'gisid': 'count'}), on=['clst', 'clst'], how='left')

In [None]:
pg_engine = create_engine(PG_CONNECT)
all_assess.to_postgis("props", con=pg_engine, schema='public', if_exists='replace', index=True, index_label='id')