<a href="https://colab.research.google.com/github/christian-thomas-schmidt/Python_Thursday/blob/main/Real_Estate_Project_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Christian Schmidt: Real Estate Dataset

This is my MET CS 777 project using pyspark and machine learning in Python.<br>
The dataset is provided by Ahmed Shahriar Sakib on Kaggle.com.<br><br>
The <b>"Goal"</b> of this project is to see whether I can predict house prices based on the features provided using machine learning in pyspark.<br><br>
I expect to use Linear Regression algorithm to create a model and I hypothesize that the greater the features (more beds/baths, a bigger acre_lot, larger house_size) the more expensive a house will be.<br><br>
The link for the dataset can be found [here](https://www.kaggle.com/datasets/ahmedshahriarsakib/usa-real-estate-dataset?resource=download).<br>

### Context:
This dataset contains Real Estate listings in the US broken by State and zip code.<br> 
Data was collected via web scraping using python libraries.

### Content:

--- The dataset has 1 CSV file with 12 columns ---

*   realtor-data.csv (200k+ entries)
*   status
*   bed
*   bath
*   acre_lot
*   full_address
*   street
*   city
*   state
*   zip_code
*   house_size
*   sold_date




First step is to download the data and import into google colab enviroment

In [1]:
! pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
! mkdir ~/.kaggle

In [3]:
! cp kaggle.json ~/.kaggle/

In [4]:
! chmod 600 ~/.kaggle/kaggle.json

In [5]:
!kaggle datasets download -d ahmedshahriarsakib/usa-real-estate-dataset

Downloading usa-real-estate-dataset.zip to /content
  0% 0.00/5.07M [00:00<?, ?B/s]
100% 5.07M/5.07M [00:00<00:00, 77.6MB/s]


In [6]:
!unzip usa-real-estate-dataset

Archive:  usa-real-estate-dataset.zip
  inflating: realtor-data.csv        


### Importing Libraries

In [7]:
!pip install --ignore-installed -q pyspark==3.2.1

[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[K     |████████████████████████████████| 198 kB 49.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [86]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

In [9]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [10]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

### Exploring the Dataset
I want to gather information about this dataset before I begin my clean-up process.

In [11]:
df = spark.read.option("header",True).csv('realtor-data.csv')
df.printSchema()
df.show(5)

root
 |-- status: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- bath: string (nullable = true)
 |-- acre_lot: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- house_size: string (nullable = true)
 |-- sold_date: string (nullable = true)

+--------+--------+---+----+--------+--------------------+--------------------+----------+-----------+--------+----------+---------+
|  status|   price|bed|bath|acre_lot|        full_address|              street|      city|      state|zip_code|house_size|sold_date|
+--------+--------+---+----+--------+--------------------+--------------------+----------+-----------+--------+----------+---------+
|for_sale|105000.0|3.0| 2.0|    0.12|Sector Yahuecas T...|Sector Yahuecas T...|  Adjuntas|Puerto Rico|   601.0|     920.0|     null

We want to work with FloatType() for all numbers for this analyses, let's change the schema to incorporate this

In [12]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType

schema = StructType([StructField("status",StringType(),True),
                     StructField("price",FloatType(),True),
                     StructField("bed",FloatType(),True),
                     StructField("bath",FloatType(),True),
                     StructField("acre_lot",FloatType(),True),
                     StructField("full_address",StringType(),True),
                     StructField("street",StringType(),True),
                     StructField("city",StringType(),True),
                     StructField("state",StringType(),True),
                     StructField("zip_code",StringType(),True),
                     StructField("house_size",FloatType(),True),
                     StructField("sold_date",StringType(),True)])


In [13]:
df = spark.read.csv('realtor-data.csv', header=True, schema=schema)
df.printSchema()
df.show(5)

root
 |-- status: string (nullable = true)
 |-- price: float (nullable = true)
 |-- bed: float (nullable = true)
 |-- bath: float (nullable = true)
 |-- acre_lot: float (nullable = true)
 |-- full_address: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- house_size: float (nullable = true)
 |-- sold_date: string (nullable = true)

+--------+--------+---+----+--------+--------------------+--------------------+----------+-----------+--------+----------+---------+
|  status|   price|bed|bath|acre_lot|        full_address|              street|      city|      state|zip_code|house_size|sold_date|
+--------+--------+---+----+--------+--------------------+--------------------+----------+-----------+--------+----------+---------+
|for_sale|105000.0|3.0| 2.0|    0.12|Sector Yahuecas T...|Sector Yahuecas T...|  Adjuntas|Puerto Rico|   601.0|     920.0|     null|
|fo

In [14]:
print((df.count(), len(df.columns)))

(203126, 12)


With only 203,126 rows of data, I wonder what states this data includes?

In [15]:
state_array = np.array(df.select("state").collect())
state_unique, state_count = np.unique(state_array, return_counts=True)
result = np.column_stack((state_unique, state_count))
print(result)

[['Connecticut' '12207']
 ['Massachusetts' '150792']
 ['New Hampshire' '4721']
 ['New Jersey' '2']
 ['New York' '1874']
 ['Puerto Rico' '24679']
 ['Rhode Island' '4907']
 ['South Carolina' '24']
 ['Tennessee' '18']
 ['Vermont' '1324']
 ['Virgin Islands' '2573']
 ['Virginia' '5']]


with 150,000 homes, the dataset is mostly focused on the housing market in Massachusetts.<br> With this information I may want to narrow the scope of my project to just look at the housing market in Massachusetts.<br><br> However, what if there are duplicates?

In [16]:
df_duplicates = df.groupBy("full_address").count().filter("count > 1")
df_duplicates.show()

+--------------------+-----+
|        full_address|count|
+--------------------+-----+
|17 Estancias Del ...|   14|
|499 Calle Reparto...|    7|
|Villa Taina Guani...|    6|
|1 Calle Independe...|    6|
|G9 Serenidad, Coa...|   15|
|Arecibo Bo Hato A...|    8|
|1 Calle Cervantes...|   17|
|44 Grange Stock E...|    4|
|2-10 Recon Concor...|    2|
|29 Pleasant St Ap...|   27|
|Old Stage Rd Lot ...|   33|
|24 Worcester Rd, ...|    7|
|37 Marian Ave, Pi...|   16|
|4 Cooper Rd, Grea...|   12|
|37 Orchard St, Ad...|   19|
|181 Holmes Rd, Pi...|   15|
|71 Tracy Cir, Win...|    9|
|28-30 W Main St, ...|   22|
|19 Stone Xing Lot...|   14|
|221 Route 87, Col...|    3|
+--------------------+-----+
only showing top 20 rows



In [17]:
df_duplicates.select("count").groupBy().sum().collect()[0][0]

198133

The vast majority of this dataset includes duplicate rows, I think it would be best to remove these and then see what the distrubtion is at the state level.

In [18]:
df2 = df.drop_duplicates()
#df2 = df.distinct

In [36]:
# repeat prior step, but include percent distribution
def state_table(df):
  state_array = np.array(df.select("state").collect())
  state_unique, state_count = np.unique(state_array, return_counts=True)
  state_percent = ["%.3f%%" % elem for elem in list(state_count*100/len(state_array))]
  state_df = pd.DataFrame({'State':state_unique,'Count':state_count,"% Dist.":state_percent}).sort_values(by=['Count'],ascending=False)
  return(state_df)

In [37]:
print(state_table(df2))

             State  Count  % Dist.
1    Massachusetts   9514  45.480%
0      Connecticut   3870  18.500%
5      Puerto Rico   2664  12.735%
6     Rhode Island   2117  10.120%
2    New Hampshire    965   4.613%
4         New York    800   3.824%
10  Virgin Islands    750   3.585%
9          Vermont    235   1.123%
3       New Jersey      1   0.005%
7   South Carolina      1   0.005%
8        Tennessee      1   0.005%
11        Virginia      1   0.005%


Given this insight, I will shift my focus to building my model to predict Massachusetts housing prices.<br>
I will then use Conneticut as a control group to analyze the affectiveness cross border.<br><br>
Finally I will look at the housing market in Puerto Rico and see if I can find any unique differences and see if the model is also applicable to the territory.

### Pre-processing
I will limit the scope of this project to the state level.<br>
With that I will pre-process data to prepare the dataset for modeling.

In [38]:
# since we want to predict home prices, I will remove unwanted columns.
df3 = df2.drop('status','full_address','street','zip_code','sold_date')
df3.printSchema()

root
 |-- price: float (nullable = true)
 |-- bed: float (nullable = true)
 |-- bath: float (nullable = true)
 |-- acre_lot: float (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- house_size: float (nullable = true)



Next step, I will remove any na values as I want avoid  bias in my data and only deal with known features.<br>

In [39]:
# Now we want to remove any rows that have null values
df3 = df3.na.drop("any")

In [41]:
# Let's obserbe that data after the removal of NA values 
print((df3.count(), len(df3.columns)))
print(state_table(df3))

(13069, 7)
            State  Count  % Dist.
1   Massachusetts   5772  44.166%
0     Connecticut   2805  21.463%
6    Rhode Island   1788  13.681%
5     Puerto Rico   1548  11.845%
2   New Hampshire    501   3.833%
4        New York    423   3.237%
7         Vermont    126   0.964%
8  Virgin Islands    105   0.803%
3      New Jersey      1   0.008%


I'm happy with the distribution and glad to see that Massachusetts still remains as the largest count of homes. From here I will create subset the dataset with the 3 states I am interested in.

1.   Massachusetts
2.   Conneticut
3.   Puerto Rico

In [62]:
df_3_states = df3.where((df3.state == 'Massachusetts')\
                        | (df3.state == 'Connecticut')\
                        | (df3.state == 'Puerto Rico'))

print(state_table(df_3_states))

           State  Count  % Dist.
1  Massachusetts   5772  57.007%
0    Connecticut   2805  27.704%
2    Puerto Rico   1548  15.289%


### Post-Exploratory Analysis

I performed Exploratory Analysis in Python before I conducted the same process in Pyspark. These are the results from the process.

In [63]:
df_3_states = df_3_states.where((df_3_states.house_size < 100000)\
                         & (df_3_states.acre_lot < 9000)\
                         & (df_3_states.bath < 100)\
                         & (df_3_states.price < 40000000))

In [64]:
print((df_3_states.count(), df3.count()))

(10120, 13069)


In [70]:
# Split the dataset into the 3 states and observe the summary() results for each

df_massachusetts = df_3_states.where(df3.state == 'Massachusetts').drop('state','city')
df_connecticut = df_3_states.where(df3.state == 'Connecticut').drop('state','city')
df_puerto_rico = df_3_states.where(df3.state == 'Puerto Rico').drop('state','city')

### Standardize the data_set

Next step is to standardize the data sets for regression modeling

In [74]:
df_massachusetts.show(5)
df_massachusetts.printSchema()

+--------+---+----+--------+----------+
|   price|bed|bath|acre_lot|house_size|
+--------+---+----+--------+----------+
|299900.0|3.0| 2.0|    0.22|    2037.0|
|215000.0|2.0| 1.0|    0.22|     816.0|
|159900.0|2.0| 2.0|    0.28|    1206.0|
|445000.0|8.0| 6.0|    0.13|    8369.0|
|784900.0|6.0| 2.0|    0.16|    2220.0|
+--------+---+----+--------+----------+
only showing top 5 rows

root
 |-- price: float (nullable = true)
 |-- bed: float (nullable = true)
 |-- bath: float (nullable = true)
 |-- acre_lot: float (nullable = true)
 |-- house_size: float (nullable = true)



In [116]:
# create function to scale features into one column for regression models
def scaled_features(df):
  assembler = VectorAssembler(inputCols=["bed", "bath", "acre_lot", "house_size"], 
  outputCol="features")
  output = assembler.transform(df)

  scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
  scaledData = scaler.fit(output).transform(output)
  scaledData = scaledData.select('price','scaledFeatures')
  return scaledData

In [177]:
df_massachusetts_scaled = scaled_features(df_massachusetts)
df_connecticut_scaled = scaled_features(df_connecticut)
df_puerto_rico_scaled = scaled_features(df_puerto_rico)

In [178]:
df_massachusetts_scaled.show(5)

+--------+--------------------+
|   price|      scaledFeatures|
+--------+--------------------+
|299900.0|[0.02352941176470...|
|215000.0|[0.01176470588235...|
|159900.0|[0.01176470588235...|
|445000.0|[0.08235294117647...|
|784900.0|[0.05882352941176...|
+--------+--------------------+
only showing top 5 rows



# Model Training and Testing

The Data is all set, I will now experiment with different models to find the the highest R2 score.

In [179]:
# Split the data set into two sets 
train_data,test_data=df_massachusetts_scaled.randomSplit([0.7,0.3])
train_data.describe().show()
train_data.show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|              4016|
|   mean|1117820.3321713146|
| stddev| 1753014.909359984|
|    min|           10000.0|
|    max|             3.0E7|
+-------+------------------+

+-------+--------------------+
|  price|      scaledFeatures|
+-------+--------------------+
|10000.0|[0.02352941176470...|
|10000.0|[0.03529411764705...|
|24900.0|[0.07058823529411...|
|29974.0|[0.02352941176470...|
|39900.0|[0.0,0.0,3.005645...|
|43900.0|[0.05882352941176...|
|49999.0|[0.03529411764705...|
|50000.0|[0.01176470588235...|
|59900.0|[0.01176470588235...|
|59900.0|[0.01176470588235...|
|59900.0|[0.02352941176470...|
|61500.0|[0.02352941176470...|
|65000.0|[0.0,0.0,1.742403...|
|75000.0|[0.01176470588235...|
|80000.0|[0.01176470588235...|
|84900.0|[0.01176470588235...|
|84900.0|[0.01176470588235...|
|84900.0|[0.01176470588235...|
|89000.0|[0.01176470588235...|
|99000.0|[0.01176470588235...|
+-------+----------------

In [180]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

Let's examine 5 different regression models to see the ideal model.

In [147]:
def modelsummary(model):
    import numpy as np
    Summary=model.summary
    print ("Mean squared error: % .6f" \
           % Summary.meanSquaredError, ", RMSE: % .6f" \
           % Summary.rootMeanSquaredError )
    print ("##","Multiple R-squared: %f" % Summary.r2, ", \
            Total iterations: %i"% Summary.totalIterations)

In [152]:
def modelsummary(model):
    Summary=model.summary
    print(f"R2: {Summary.r2:.6f}")
    print(f"Total iterations: {Summary.totalIterations}")
            

In [195]:
# Linear Regression

lr = LinearRegression(featuresCol='scaledFeatures',labelCol='price')
lr_model = lr.fit(train_data)
lr_data = lr_model.transform(test_data)

In [199]:
# Decision Tree Regressor

dt = DecisionTreeRegressor(featuresCol='scaledFeatures',labelCol='price')
dt_model = dt.fit(train_data)
dt_data = dt_model.transform(test_data)

In [201]:
# Random forest regression

rf = RandomForestRegressor(featuresCol='scaledFeatures',labelCol='price')
rf_model = rf.fit(train_data)
rf_data = rf_model.transform(test_data)

In [202]:
# Gradeint Boosted Tree Regression

gbtr = GBTRegressor(featuresCol='scaledFeatures', labelCol='price', maxIter=10)
gbtr_model = gbtr.fit(train_data)
gbtr_data = gbtr_model.transform(test_data)

In [203]:
def model_evaluate(mdata):
 
  rmse=RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
  rmse=rmse.evaluate(mdata) 
  
  mae=RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
  mae=mae.evaluate(mdata) 
  
  r2=RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
  r2=r2.evaluate(mdata)

  print("RMSE: ", rmse)
  print("MAE: ", mae)
  print("R-squared: ", r2)

In [204]:
print("Linear Regression")
model_evaluate(lr_data)
print("\n")
print("Decision Tree Regressor")
model_evaluate(dt_data)
print("\n")
print("Random forest regression")
model_evaluate(rf_data)
print("\n")
print("Gradeint Boosted Tree Regression")
model_evaluate(gbtr_data)

Linear Regression
RMSE:  1422559.5557176957
MAE:  633149.5148706498
R-squared:  0.3759482252992099


Decision Tree Regressor
RMSE:  1394508.929893673
MAE:  568100.1715472434
R-squared:  0.4003162130856852


Random forest regression
RMSE:  1334578.9640758822
MAE:  557134.2087125469
R-squared:  0.45075228566076864


Gradeint Boosted Tree Regression
RMSE:  1398845.948770821
MAE:  562715.733978738
R-squared:  0.39658029681580864


Even after evaluation multiple models, Random Forest Regression model only hit a 45% for dependent variability. Let's see how it performs on the Connecticut and Puerto Rico datasets

In [206]:
rf_connecticut = rf_model.transform(df_connecticut_scaled)
rf_puerto_rico = rf_model.transform(df_puerto_rico_scaled)

In [207]:
print("Massachusetts")
model_evaluate(rf_data)
print("\n")
print("Connecticut")
model_evaluate(rf_connecticut)
print("\n")
print("Puerto Rico")
model_evaluate(rf_puerto_rico)

Massachusetts
RMSE:  1334578.9640758822
MAE:  557134.2087125469
R-squared:  0.45075228566076864


Connecticut
RMSE:  3444421.97542909
MAE:  2513596.4233944006
R-squared:  -29.854296694293037


Puerto Rico
RMSE:  1667782.8378187274
MAE:  1116489.4911979944
R-squared:  0.057408859916077914


The model gets worse when trying to predict housing prices in Connecticut and Purto Rico.

I have two conclusions as a result.

I believe that the dataset is to narrow and that there may be other features that would affect a house price more than just the size, acre, bed amount, and bath amount. I could be as simple as median income, or something more complex like shortages vs. surplus of house available.

It doesn't seem like one can apply a model trained on one state to another state/U.S. territory. There are far to many factors that are affecting the price that what was available in this dataset.