In [2]:
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ModelTraining").getOrCreate()

sc = spark.sparkContext

In [4]:
df = spark.read.csv('file:///home/talentum/shared/project_dataset/pyspark_dataset.csv',
                    header = True,inferSchema = True)
df.show(5)

+-------------------+----+-----+--------------------+-------+-----------+-----+-------+----------+
|livingArea in sq.ft|beds|baths|             address|zipcode|       city|state|country|price in $|
+-------------------+----+-----+--------------------+-------+-----------+-----+-------+----------+
|        2227.741403|   5|    4|828 Wilson Avenue...|  33604|      Tampa|   FL|    USA|    700703|
|        2757.386906|   6|    6|5684 Carl Ave, Pu...|  33647|      Tampa|   FL|    USA|    703469|
|        2113.680302|   5|    5|6633 E 111th Ave,...|  91709|Chino Hills|   CA|    USA|    932715|
|        4966.420728|  10|    9|8587 Hadaway Trl,...|  33615|      Tampa|   FL|    USA|   3131332|
|        2116.807821|   5|    4|9821 Cialella Pas...|  48180|     Taylor|   MI|    USA|    305293|
+-------------------+----+-----+--------------------+-------+-----------+-----+-------+----------+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- livingArea in sq.ft: double (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- price in $: integer (nullable = true)



In [6]:
df.select("state").distinct().count()

13

In [7]:
df.select("city").distinct().count()

337

In [8]:
df = df.withColumnRenamed("livingArea in sq.ft", "livingArea_sqft") \
       .withColumnRenamed("price in $", "price_usd")

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression,RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


Scaling - Standard Scaler

In [10]:
sqft_assembler = VectorAssembler(
inputCols=["livingArea_sqft"],
outputCol="livingArea_sqft_vec"
)

In [11]:
scaler = StandardScaler(
    inputCol="livingArea_sqft_vec", 
    outputCol="scaled_area" 
)

In [12]:
categorical_cols = ["city", "state"]
numeric_cols = ["scaled_area", "beds","baths",'zipcode']

In [13]:
indexers = [
    StringIndexer(
    inputCol=col, 
    outputCol=f"{col}_idx", 
    handleInvalid="keep"
    ) for col in categorical_cols
]

Correlation Matrix

In [14]:
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import col

In [15]:
df.filter(col('zipcode').isNull()).count()

0

In [16]:
indexer_pipeline = Pipeline(stages = indexers)
df_indexed = indexer_pipeline.fit(df).transform(df)

corr_cols = ['price_usd','livingArea_sqft','beds','baths','city_idx','state_idx','zipcode']

In [17]:
corr_assembler = VectorAssembler(inputCols = corr_cols,outputCol='corr_features')
df_vector = corr_assembler.transform(df_indexed).select('corr_features')

In [18]:
matrix = Correlation.corr(df_vector,'corr_features').head()
corr_matrix = matrix[0].toArray()
#corr_matrix

In [19]:
corr_df = pd.DataFrame(corr_matrix,index = corr_cols,columns = corr_cols)
corr_df.style.background_gradient(cmap='coolwarm').set_precision(2)


Unnamed: 0,price_usd,livingArea_sqft,beds,baths,city_idx,state_idx,zipcode
price_usd,1.0,0.66,0.55,0.55,-0.01,0.01,-0.04
livingArea_sqft,0.66,1.0,0.98,0.97,-0.0,0.0,-0.0
beds,0.55,0.98,1.0,0.99,-0.0,0.0,-0.0
baths,0.55,0.97,0.99,1.0,-0.0,0.0,-0.0
city_idx,-0.01,-0.0,-0.0,-0.0,1.0,0.35,0.17
state_idx,0.01,0.0,0.0,0.0,0.35,1.0,-0.09
zipcode,-0.04,-0.0,-0.0,-0.0,0.17,-0.09,1.0


Encoding - One hot Encoder

In [20]:
encoder = OneHotEncoderEstimator(
    inputCols=[f"{col}_idx" for col in categorical_cols],
    outputCols=[f"{col}_ohe" for col in categorical_cols]
)

In [21]:
assembler = VectorAssembler(
inputCols=numeric_cols + [f"{c}_ohe" for c in categorical_cols],
outputCol="features"
)

In [22]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)


Linear Regression

In [74]:
lr = LinearRegression(
featuresCol="features",
labelCol="price_usd"
)

In [75]:
pipeline = Pipeline(
stages=indexers + [sqft_assembler,scaler,encoder, assembler, lr]
)

In [76]:
model = pipeline.fit(train_df)

In [77]:
predictions = model.transform(test_df)

In [33]:
evaluator = RegressionEvaluator(
    labelCol="price_usd",      
    predictionCol="prediction", 
    metricName="r2"
)

In [79]:
r2 = evaluator.evaluate(predictions)
print(f"R2 Score: {r2:.4f}")

R2 Score: 0.7014


RandomForest

In [80]:
rf = RandomForestRegressor(
    labelCol="price_usd", 
    featuresCol="features",
    numTrees=50, 
    maxDepth=10,
    seed=42)

In [81]:
pipeline = Pipeline(
stages=indexers + [sqft_assembler,scaler,encoder, assembler, rf]
)

In [82]:
model = pipeline.fit(train_df)

In [83]:
predictions = model.transform(test_df)

In [84]:
r2 = evaluator.evaluate(predictions)
print(f"R2 Score: {r2:.4f}")

R2 Score: 0.8370


Decision Tree

In [85]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(
    labelCol="price_usd", 
    featuresCol="features",
    seed = 42)

In [86]:
pipeline = Pipeline(
stages=indexers + [sqft_assembler,scaler,encoder, assembler, dt]
)

In [87]:
model = pipeline.fit(train_df)

In [88]:
predictions = model.transform(test_df)

In [89]:
r2 = evaluator.evaluate(predictions)
print(f"R2 Score: {r2:.4f}")

R2 Score: 0.8067


GBTRegressor

In [25]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(
    featuresCol="features", 
    labelCol="price_usd", 
    maxIter=20, 
    maxDepth=5
)

In [91]:
pipeline = Pipeline(
stages=indexers + [sqft_assembler,scaler,encoder, assembler, gbt]
)

In [92]:
model = pipeline.fit(train_df)

In [93]:
predictions = model.transform(test_df)

In [94]:
r2 = evaluator.evaluate(predictions)
print(f"R2 Score: {r2:.4f}")

R2 Score: 0.8761


Hyper tunning parameters

In [30]:
maxDepth_list = [5,7,9]      
maxIter_list = [50]       

In [31]:
featured_pipeline = Pipeline(
            stages=indexers + [sqft_assembler,scaler,encoder, assembler])

featured_cols = featured_pipeline.fit(train_df)

train_fe = featured_cols.transform(train_df)
test_fe = featured_cols.transform(test_df)

In [34]:
best_r2 = -1
best_params = {}

for depth in maxDepth_list:
    for iters in maxIter_list:
            gbt = GBTRegressor(
            labelCol = "price_usd",
            featuresCol = "features",
            maxDepth = depth,
            maxIter = iters,
            seed = 42)
            model = gbt.fit(train_fe)
            predictions = model.transform(test_fe)
            r2 = evaluator.evaluate(predictions)
            print(f"Depth={depth}, Iter={iters} -- R2 Score: {r2:.4f}")
                
            if r2> best_r2:
                best_r2 = r2
                best_params = {
                    "maxDepth" : depth,
                    "maxIter" : iters,
                    }

Depth=5, Iter=50 -- R2 Score: 0.8882
Depth=7, Iter=50 -- R2 Score: 0.8899
Depth=9, Iter=50 -- R2 Score: 0.8915


In [35]:
print("Best Model")
print("Best R2:", best_r2)
print("Best Parameters:",best_params)

Best Model
Best R2: 0.8914880194926682
Best Parameters: {'maxDepth': 9, 'maxIter': 50}
