### Get the data from the website and prepare it

File: PropertyData.txt  
Source: https://www.tad.org (Tarrant Appraisal District, TX)

In [None]:
import requests, zipfile, io, os
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, trim, col, udf, when
from pyspark.ml.feature import StringIndexer

# memory issues .. see https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space
spark = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "26g")\
        .config("spark.driver.memory", "26g")\
        .config("spark.memory.offHeap.enabled", True).config("spark.memory.offHeap.size","16g")\
        .appName("PropertyData").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark
zip_url = "https://www.tad.org/content/data-download/PropertyData(Delimited).ZIP"
source_file = "/tmp/spark/PropertyData.txt"

if os.path.isfile(source_file):
    print("Using existing property file")
else:
    download_url = "/tmp/spark/"
    r = requests.get(zip_url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(download_url)

df = spark.read.csv(source_file, sep="|", header=True, inferSchema=True)
df = df.withColumn('id', monotonically_increasing_id())
df = df[['id'] + df.columns[:-1]]   # move id column to front
for name, dtype in df.dtypes:       # trim all string columns
    if dtype == "string":
        df = df.withColumn(name, trim(col(name)))


# Replace missing string values for columns that are used in logistic regression later.
# (Even if you convert them to IDX and drop them, you still get "Cannot have an empty string for name" error!!)
# see: https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
df = df.withColumn('Swimming_Pool_Ind', when(col('Swimming_Pool_Ind') == 'X', 'Y').otherwise('N'))

# convert certain string columns to idx for use with logistic regression later on
string_to_idx_columns = ['Swimming_Pool_Ind', 'Central_Heat_Ind', 'Central_Air_Ind']
for col_name in string_to_idx_columns:
    si = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_idx")
    df = si.fit(df).transform(df)    


# convert the Appraised_Value to integer
df = df.withColumn('Appraised_Value_Int', df['Appraised_Value'].cast('int'))

# drop specific columns we don't need going forward, including 'Appraised_Value' ...
drop_cols = ['Sequence_No', 'Record_Type', 'PIDN', 'Owner_Name', 'Owner_Address', 'Owner_CityState', 'Owner_Zip4',
             'Owner_CRRT', 'Situs_Address', 'TAD_Map', 'MAPSCO', 'Exemption_Code', 'State_Use_Code', 'LegalDescription',
             'Notice_Date', 'Deed_Date', 'Deed_Book', 'Appraisal_Date', 'Deed_Page', 'ARB_Indicator', 'From_Accts',
             'GIS_Link', 'Instrument_No', 'Overlap_Flag', 'Num_Bedrooms', 'Num_Bathrooms', 'Structure_Count', 'Ag_Code',
             'Appraisal_Year', 'Owner_Zip', 'Appraised_Value']
df = df.drop(*drop_cols)

# udpdate the view for SQL queries
# df.createOrReplaceTempView("data")
# spark.sql('select * from data limit 10')

# Define a UDF to calculate the percentage difference
from pyspark.sql.types import DoubleType
percent_diff_udf = udf(lambda new_value, old_value: round(((new_value - old_value) / old_value) * 100, 0) if old_value != 0 else None,
                       DoubleType())


In [None]:
# !! Optional step to see how many rows should be removed for incomplete data

# Inspect dataframe for missing values
from pyspark.sql.functions import sum, col
missing_value_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
missing_value_counts.show()

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

subset = [ 'Total_Value', 'Appraised_Value_Int',   
    'Central_Heat_Ind_idx', 'Central_Air_Ind_idx', 'Swimming_Pool_Ind_idx'
]

logistic_df = df.select(*subset).limit(10)  # <----- 10 works, 100 causes "java.lang.NegativeArraySizeException" ?

# Apply One-Hot encoding to _idx to get _bin for logistic columns
for col_name in ['Swimming_Pool_Ind', 'Central_Air_Ind', 'Central_Heat_Ind']:
    enc = OneHotEncoder(inputCols=[f"{col_name}_idx"], outputCols=[f"{col_name}_bin"])
    logistic_df = enc.fit(logistic_df).transform(logistic_df)

logistic_features = [
    'Total_Value', 'Central_Heat_Ind_idx', 'Central_Air_Ind_idx', 'Swimming_Pool_Ind_idx',
]

model_logistic = LogisticRegression(featuresCol="features", labelCol="Appraised_Value_Int")

# Train
train_logistic, test_logistic = logistic_df.randomSplit([0.7, 0.3])
assembler = VectorAssembler(inputCols=logistic_features, outputCol='features')
train_logistic = assembler.transform(train_logistic)
test_logistic = assembler.transform(test_logistic)
logistic_trained_model = model_logistic.fit(train_logistic)

# Evaluate
logistic_predictions = logistic_trained_model.transform(test_logistic)
evaluator = BinaryClassificationEvaluator(labelCol='Appraised_Value_Int', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(logistic_predictions)

# Print the accuracy
print(f"Accuracy: {accuracy}")

### Multi-Linear Regression

In [None]:
# Multi-Linear Regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

# Define the feature columns
numerical_features = ['Spec1', 'Spec2', 'Spec3', 'Spec4', 'Spec5', 'Total_Value']

# Drop rows containing missing values in the columns for features data
cleaned_numeric_df = df.dropna(subset=numerical_features)

train_linear, test_linear = cleaned_numeric_df.randomSplit([0.7, 0.3])
model_linear = LinearRegression(featuresCol='features', labelCol='Appraised_Value_Int')
assembler = VectorAssembler(inputCols=numerical_features, outputCol='features')

# Train
train_linear = assembler.transform(train_linear)
test_linear = assembler.transform(test_linear)
linear_trained_model = model_linear.fit(train_linear)

# Evaluate
linear_predictions = linear_trained_model.transform(test_linear)
evaluator = RegressionEvaluator(labelCol='Appraised_Value_Int', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(linear_predictions)

# cast the predictions to integer and show the difference as integer and perentage
linear_predictions = linear_predictions.withColumn('prediction_int', linear_predictions['prediction'].cast('int'))
linear_predictions = linear_predictions.drop('prediction')
linear_predictions = linear_predictions.withColumn('diff', col('Appraised_Value_Int') - col('prediction_int'))
linear_predictions = linear_predictions.withColumn('diff_percent', percent_diff_udf(col('prediction_int'), col('Appraised_Value_Int')))

# create the view with summary data
linear_predictions.select('Account_Num', 'prediction_int', 'diff', 'diff_percent').createOrReplaceTempView("linear")

print("Root Mean Squared Error (RMSE):", round(rmse, 2))

In [None]:
# let's take a look at some of the data
# we group the diff_percent values so there aren't too many records to graph
# also, here we are just looking at residential properties
query = """
with summary as (
  with src as (
    select FLOOR(CAST(diff_percent as INT) / 20) * 20 AS diff_group
    from linear 
    join data on data.Account_Num = linear.Account_Num
    where data.RP = 'R' 
      and Appraised_Value_Int > 20000
  )
  select count(diff_group) as count, diff_group from src group by 2 order by 2 desc
)
select count, diff_group from summary where count > 100
"""
res = spark.sql(query)
# res.show()

# graph it show how the predictions differ from the current appraisal values
import matplotlib.pyplot as plt
import pandas as pd
pandas_df = res.toPandas()
plt.bar(pandas_df['diff_group'], pandas_df['count'], color='blue', width=8)
plt.xlabel('diff_group')
plt.ylabel('count')
plt.title('Prediction Diff (longest line should be over "0")') # .. which would indicate more accurate prediction
plt.show()