## Identifying Real Estate Opportunity for Low Income Housing Programs

This is a rework of an analysis originally done using the traditional python scientific suite of modules. Here the bulk of the analysis is accomplished using Pyspark in order to take advantage of some of its features including the ML package and its pipelining capabilities.

To reiterate the goal of this analysis, we are taking Austin, TX housing data and creating a scoring model that will allow us pick the top 100 properties that best fit low income housing program goals. We begin by importing the necessary packages, creating a spark context, and uploading our data:

In [1]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.clustering import KMeans
from pyspark.sql.types import FloatType, ArrayType, DoubleType, IntegerType
from pyspark.sql import functions as F
import pandas as pd
from math import cos, sqrt
import geopandas as gpd
from helper_functions.plotly_plotting import plot_austin

In [2]:
import pyspark
spark = pyspark.sql.SparkSession.builder\
        .master('local')\
        .getOrCreate()

In [3]:
trulia_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .option("mode", "failFast")\
    .load("data/prices.csv")

atx_addresses_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .option("mode", "failFast")\
    .load("data/addresses_atx.csv")

In [4]:
# Here using pandas to display the result because Pyspark dataframes sometimes take up too much space!
pandas_df = pd.DataFrame(trulia_df.take(5), columns=trulia_df.columns)
pandas_df

Unnamed: 0,_c0,bathrooms,bedrooms,house_type,price,sqft,price_per_sqft,adj_address,latitude,longitude
0,0,2.0,3.0,Condo,280000,1427,196.215837,4159 STECK AVENUE AUSTIN TX 78759,30.376725,-97.757702
1,1,3.0,4.0,Single-Family Home,299000,2224,134.442446,15810 DE PEER COVE AUSTIN TX 78717,30.49254,-97.740365
2,2,1.0,3.0,Single-Family Home,329900,1005,328.258706,8201 LAZY LANE AUSTIN TX 78757,30.355568,-97.717455
3,3,2.0,3.0,Single-Family Home,229874,1499,153.351568,14511 FITZGIBBON DRIVE AUSTIN TX 78725,30.2343,-97.588498
4,4,2.0,3.0,Single-Family Home,237501,1295,183.398456,10304 BANKHEAD DRIVE AUSTIN TX 78757,30.351538,-97.732571


In [5]:
print(("Rows: ", trulia_df.count(),"Columns: ", len(trulia_df.columns)))

('Rows: ', 623, 'Columns: ', 10)


In [6]:
atx_addresses_df.show(5)

+---+------------------+------------------+--------------------+
|_c0|               lat|               lon|         adj_address|
+---+------------------+------------------+--------------------+
|  1|30.186914399999996|       -97.9280313|12200 PRATOLINA D...|
|  3|         30.352751|-97.95679799999999|15302 DOROTHY DRI...|
|  4|30.169753200000002|-97.83318670000001|2515 DREW LANEAUS...|
|  5|30.354369199999997|       -97.9577997|15404 JOSEPH DRIV...|
|  7|30.399155600000004|       -97.8516461|11203 RANCH ROAD ...|
+---+------------------+------------------+--------------------+
only showing top 5 rows



In [7]:
print(("Rows: ", atx_addresses_df.count(), "Columns: ", len(atx_addresses_df.columns)))

('Rows: ', 140386, 'Columns: ', 4)


In [8]:
atx_addresses_df = atx_addresses_df.withColumnRenamed('lat', 'latitude').withColumnRenamed('lon', 'longitude')

## Feature Engineering

Now that we have our data in Pyspark dataframes, we will begin to build our data pipeline. The pipeline functionality allows us to plan out several data transformations and have Spark optimize the workload over all the transformations for us.

In Pyspark, almost all of the ML package's algorithms require the input data to be formatted in a dataframe column consisting of vectors of the input features. Convention is to name this column "features". And so we'll use the VectorAssembler to accomplish this. For our purposes, we need to use training data that we scraped from Trulia.com to train a regressor and then use the model to regress housing data onto a larger dataset of properties:

In [9]:
# use stages list to create a plan for the pipeline
stages = []
assemblerInputs = ['latitude', 'longitude']
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]

In [10]:
stages

[VectorAssembler_040bbcbd7a24]

In [11]:
# instantiate pipeline and fit it to our data set
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(trulia_df)
train_df = pipelineModel.transform(trulia_df)
train_df = train_df.select('features', 'price_per_sqft')
train_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- price_per_sqft: double (nullable = true)



In [12]:
# we repeat the above step, but for our dataframe of expanded property data so that we can use or regressor on it
pipelineModel = pipeline.fit(atx_addresses_df)
test_df = pipelineModel.transform(atx_addresses_df)
test_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- adj_address: string (nullable = true)
 |-- features: vector (nullable = true)



In [13]:
# Pyspark will return a dataframe to us with the prediction column of generated outputs based on the features 
# column that we created
gbt = GBTRegressor(featuresCol='features', labelCol='price_per_sqft', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)

pandas_df = pd.DataFrame(gbt_predictions.take(5), columns=gbt_predictions.columns)
pandas_df

Unnamed: 0,_c0,latitude,longitude,adj_address,features,prediction
0,1,30.186914,-97.928031,12200 PRATOLINA DRIVEAUSTIN TX 78739,"[30.186914399999996, -97.9280313]",196.793132
1,3,30.352751,-97.956798,15302 DOROTHY DRIVEAUSTIN TX 78734,"[30.352751, -97.95679799999999]",201.806851
2,4,30.169753,-97.833187,2515 DREW LANEAUSTIN TX 78748,"[30.169753200000002, -97.83318670000001]",181.144136
3,5,30.354369,-97.9578,15404 JOSEPH DRIVEAUSTIN TX 78734,"[30.354369199999997, -97.9577997]",201.806851
4,7,30.399156,-97.851646,11203 RANCH ROAD 2222AUSTIN TX 78730,"[30.399155600000004, -97.8516461]",239.403664


In [14]:
# Since we need to complete the above process to train multiple regressors and regress multiple features on to our
# expanded dataset, we can create a function that allows us to loop over the above, passing the result df back
def regress_new_feature (train_df, test_df, features, label, maxIter=10):
    stages = []
    assemblerInputs = features
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
    stages += [assembler]
    
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(train_df)
    train = pipelineModel.transform(train_df)
    pipelineModel = pipeline.fit(test_df)
    test = pipelineModel.transform(test_df)
    
    gbt = GBTRegressor(featuresCol='features', labelCol=label, maxIter=maxIter)
    gbt_model = gbt.fit(train)
    prediction_df = gbt_model.transform(test)
    
    prediction_df = prediction_df.withColumnRenamed('prediction', label).drop('features')
    
    return prediction_df

In [15]:
# The columns we would like to regress onto the expanded dataset
train_cols = ['price_per_sqft', 'price', 'bedrooms', 'bathrooms']

# The features we will start with
features = ['latitude', 'longitude']

# Our original dataframes
train_df = trulia_df
test_df = atx_addresses_df

# A loop that uses the above function and passes the result back through the function for each newly regressed column
for col in train_cols:
    test_df = regress_new_feature(train_df, test_df, features, col, 50)
    features.append(col)

In [16]:
pandas_df = pd.DataFrame(test_df.take(5), columns=test_df.columns)
pandas_df

Unnamed: 0,_c0,latitude,longitude,adj_address,price_per_sqft,price,bedrooms,bathrooms
0,1,30.186914,-97.928031,12200 PRATOLINA DRIVEAUSTIN TX 78739,197.788176,604295.6,4.08611,3.408979
1,3,30.352751,-97.956798,15302 DOROTHY DRIVEAUSTIN TX 78734,188.078154,572990.4,3.779476,3.48985
2,4,30.169753,-97.833187,2515 DREW LANEAUSTIN TX 78748,185.093968,371447.9,3.213077,2.215508
3,5,30.354369,-97.9578,15404 JOSEPH DRIVEAUSTIN TX 78734,188.078154,572990.4,3.779476,3.48985
4,7,30.399156,-97.851646,11203 RANCH ROAD 2222AUSTIN TX 78730,292.769085,1617012.0,4.61792,5.274092


In [17]:
print(("Rows: ", test_df.count(),"Columns: ", len(test_df.columns)))

('Rows: ', 140386, 'Columns: ', 8)


Great! Now we have a full dataset to work with and can begin feature engineering in earnest. As mentioned in the project walkthrough, we'll need to find the minimum distance to public transportation for each address in our dataset.

In [18]:
# Use geopandas to read shapefile of all bus stops in ATX, then put them into a list
atx_bus_map = gpd.read_file('data/Shapefiles_20-_20JANUARY_202018/Stops/Stops.shp')
bus_stops = atx_bus_map[['LATITUDE', 'LONGITUDE']]
bus_stops = [(lat, lon) for lat, lon in zip(atx_bus_map['LATITUDE'], atx_bus_map['LONGITUDE'])]

In [19]:
# A function for our distance calculation between two points
def two_point_haversine(origin, destination):
    # calculates the distance between two points (lat, lngs) on a great circle, or on the 
    # surface of a sphere (in this case the sphere is planet earth)
    # units in km
    lat1, lng1 = origin
    lat2, lng2 = destination
    deglen = 110.25
    x = lat1 - lat2
    y = (lng1 - (lng2))*cos(lng2)
    return deglen*sqrt(x*x + y*y)

In [20]:
# A function to run through all the bus_stops for each property
def min_dist_to_transportation(homes, bus_stops):
    return [min([two_point_haversine(stop, home) for stop in bus_stops]) for home in homes]

In [21]:
# Create the distance calculation in python list
pandf = test_df.toPandas()
homes = list(zip(pandf.latitude, pandf.longitude))
min_dist = min_dist_to_transportation(homes, bus_stops)

In [22]:
# Create index column in our Pyspark dataframe using SQL
test_df.createOrReplaceTempView('testTable')

query = '''
SELECT *, ROW_NUMBER() OVER(ORDER BY adj_address) AS index
FROM testTable
'''

test_df = spark.sql(query)

In [23]:
# Use a simple User Defined Function (UDF) to create a column including our min_dist calculations into the dataframe
def add_min_dist(idx):
    return min_dist[idx - 1]

min_dist_udf = F.udf(add_min_dist, FloatType())


test_df = test_df.withColumn('min_dist', min_dist_udf('index'))

In [24]:
# We create a new pipeline for our KMeans cluster
stages = []
assemblerInputs = ['latitude', 'longitude']
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(test_df)
train_df = pipelineModel.transform(test_df)
train_df = train_df.select('features')

In [25]:
# From the walkthrough, we cluster latitude and longitude with K=15
kmeans = KMeans(maxIter=300).setK(15).setSeed(1)

kmeans_model = kmeans.fit(train_df)

predictions = kmeans_model.transform(train_df)
predictions.show(5)

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[30.1869143999999...|        14|
|[30.352751,-97.95...|         1|
|[30.1697532000000...|         2|
|[30.3543691999999...|         1|
|[30.3991556000000...|        10|
+--------------------+----------+
only showing top 5 rows



In [26]:
# We create a dataframe with each centroid and its number (this is important because each centroid is a 
# latitude/longitude pair, and we'll need them for plotting later)
numbered_clusters = []
for num, cluster in enumerate(kmeans_model.clusterCenters()):
    numbered_clusters.append([num, [float(cluster[0]), float(cluster[1])]])
centroids = spark.createDataFrame(numbered_clusters, ['cluster', 'centroid'])
centroids.show(5)

+-------+--------------------+
|cluster|            centroid|
+-------+--------------------+
|      0|[30.1672362337052...|
|      1|[30.3345713518887...|
|      2|[30.1638933586402...|
|      3|[30.3442655765284...|
|      4|[30.3992073787710...|
+-------+--------------------+
only showing top 5 rows



In [27]:
# Now we can join the centroids back onto the training dataframe using the prediction from the KMeans algorithm 
# (predictions from KMeans are cluster IDs)
predictions.createOrReplaceTempView('preds')
centroids.createOrReplaceTempView('centroids')

# Creating an index to allow us to join to the original dataframe below
query = '''
SELECT *, ROW_NUMBER() OVER(ORDER BY features) AS index
FROM preds AS p
JOIN centroids AS c ON p.prediction=c.cluster
'''

preds = spark.sql(query).drop('cluster')

In [28]:
preds.show(5)

+--------------------+----------+--------------------+-----+
|            features|prediction|            centroid|index|
+--------------------+----------+--------------------+-----+
|[28.3318666,-98.1...|         2|[30.1638933586402...|    1|
|[29.5116391,-94.4...|        13|[30.2487794564482...|    2|
|[29.5555698999999...|        13|[30.2487794564482...|    3|
|[29.5583186999999...|        13|[30.2487794564482...|    4|
|[29.6142135,-95.5...|        13|[30.2487794564482...|    5|
+--------------------+----------+--------------------+-----+
only showing top 5 rows



In [29]:
# Join the predictions, complete with centroids, back to the original dataframe
preds.createOrReplaceTempView('preds')
test_df.createOrReplaceTempView('original')

query = '''
SELECT *, p.prediction AS cluster FROM original AS o
INNER JOIN preds AS p ON o.index=p.index
'''

df = spark.sql(query).drop('index').select('latitude', 'longitude', 'price_per_sqft', 'min_dist', 'centroid')

In [30]:
df.show(5)

+------------------+------------------+------------------+---------+--------------------+
|          latitude|         longitude|    price_per_sqft| min_dist|            centroid|
+------------------+------------------+------------------+---------+--------------------+
|30.312099800000002|-97.72457390000001| 399.9647760710277|3.1569898|[30.1638933586402...|
|30.376518199999996|        -97.937327|260.05716049083173|11.432891|[30.2487794564482...|
|         30.257994|-98.04043399999999| 212.4208338172844|0.7196791|[30.2487794564482...|
|        30.3413012|       -97.6807191|  131.892550554785|11.240022|[30.2487794564482...|
|30.351137899999994|       -97.8331579|314.00411531624593|0.7368289|[30.2487794564482...|
+------------------+------------------+------------------+---------+--------------------+
only showing top 5 rows



In [31]:
# From the walkthrough, calculate distance to centroid for each property, reusing our distance function from above
def distance_to_centroid(arr, centroid):
    return two_point_haversine(arr, centroid)

centroid_udf = F.udf(distance_to_centroid, FloatType())

df = df.withColumn('dist_to_cent', centroid_udf(F.array("latitude", "longitude"), "centroid"))

In [32]:
pandas_df = pd.DataFrame(df.take(5), columns=df.columns)
pandas_df

Unnamed: 0,latitude,longitude,price_per_sqft,min_dist,centroid,dist_to_cent
0,30.3121,-97.724574,399.964776,3.15699,"[30.163893358640284, -97.83465163455125]",19.671022
1,30.376518,-97.937327,260.05716,11.432891,"[30.248779456448283, -97.58516499404504]",40.604568
2,30.257994,-98.040434,212.420834,0.719679,"[30.248779456448283, -97.58516499404504]",49.244881
3,30.341301,-97.680719,131.892551,11.240022,"[30.248779456448283, -97.58516499404504]",14.520093
4,30.351138,-97.833158,314.004115,0.736829,"[30.248779456448283, -97.58516499404504]",29.096413


## Score Calculation

Now that we've engineered features for the minimum distance to transportation and the distance to centroid, we can begin developing our score calculation. We will normalize price_per_sqft, min_dist, and dist_to_cent, and then take the sum of all of these as our ranking score. This will set us up to begin plotting our selections

In [33]:
# Create a UDF to sum our normalized columns. The input here is a vector, because will be using another pipeline
# which, as we've seen above outputs a series of vectors based on the input columns.
def sum_(vector):
    try:
        result = sum(list([float(val) for val in vector]))
        if result is None:
            return 0
        else:
            return result
    except ValueError:
        return 0

scorer = F.udf(sum_, DoubleType())
# Must register with spark to make available for sql queries - see below
spark.udf.register('scorer', scorer)

# Here we create a new pipeline, then use a sql query to apply our scorer UDF
def score_calculation(train_df, features):
    stages = []
    assemblerInputs = features
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
    # Pyspark has builtin normalization functionality for feature columns. The MinMaxScaler is nifty!
    # Notice our inputCol will be the outputCol from the assembler
    scaler = MinMaxScaler(inputCol=assembler.getOutputCol(), outputCol='scaled_features')
    stages += [assembler, scaler]
    
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(train_df)
    df = pipelineModel.transform(train_df)
    
    df.createOrReplaceTempView('table')
    # We use a sql query to apply the scorer function as well as sort and take the top 100 best properties!
    query = '''
    SELECT latitude, longitude, scaled_features, scorer(scaled_features) AS score
    FROM table
    ORDER BY score DESC LIMIT 100
    '''
    result = spark.sql(query)
    
    return result


In [34]:
# We call our function with the input features and take a look at the output
newdf = score_calculation(df, ['price_per_sqft', 'min_dist', 'dist_to_cent'])

In [35]:
newdf.show(5)

+------------------+------------------+--------------------+------------------+
|          latitude|         longitude|     scaled_features|             score|
+------------------+------------------+--------------------+------------------+
|         30.321707|       -97.7371546|[0.14325306946584...| 1.190121379927792|
|30.336747999999996|        -97.782667|[1.0,0.0286431011...| 1.083053282206181|
|30.337832199999998|-97.77931550000001|[1.0,0.0345162546...|1.0683912100235349|
|         30.337458|       -97.7766735|[1.0,0.0104056139...|1.0664375514756954|
|        30.3392972|       -97.7800517|[1.0,0.0101582440...|1.0652915402189165|
+------------------+------------------+--------------------+------------------+
only showing top 5 rows



## Plotting

We'll take the coordinates from our top 100 properties and plot them using a plotly wrapper function (found in helper_functions in this repo)

In [36]:
# Collect Pyspark columns and create typed lists
latitudes = newdf.select('latitude')
latitudes = [float(row['latitude']) for row in latitudes.collect()]
longitudes = newdf.select('longitude')
longitudes = [float(row['longitude']) for row in longitudes.collect()]

In [None]:
# Use helper function
plot_austin(latitudes, longitudes)

<strong>Unfortunately, this analysis was done in a Pyspark enabled notebook in a docker container. Thus Plotly was acting finnicky and not allowing the Plotly map data to come through to my container. So, unsatisfying as it may be to slog through the above analysis to end up with no visuals, fear not! There are visuals in the project walkthrough that this notebook is based on. Oh and don't forget to checkout the plotly helper function!</strong>

## Conclusion and Final Thoughts

As we've seen above, Pyspark is incredibly powerful for machine learning analyses. We drastically improved our ability to feature engineer a dataset using the pipeline capabilities from the ML package. A lot was also gained from Pyspark's integrated functionality around sql queries and User Defined Functions. 

Sadly, we weren't able to plot within this notebook, but please do refer to the project walkthrough as the Plotly module creates very nice maps, and the plots are included in that markdown.