In [1]:
# install pyspark in google colab
!pip install pyspark



In [2]:
# import sparkSession and build a new app
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("californiaHousing").getOrCreate()

In [3]:
# load data from google colab sample data
df = spark.read.csv('/content/sample_data/california_housing_train.csv', header = True, inferSchema=True)

In [4]:
# show header and 5 top rows of data frame
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [5]:
# show description of df
df.describe()

DataFrame[summary: string, longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [7]:
# cast data from rows to floating poin numbers
from pyspark.sql.types import FloatType
cols = df.columns
for col in cols:
  df = df.withColumn(col, df[col].cast('float'))

In [9]:
# use groupby and orderby for funny result
from pyspark.sql.functions import desc
df.groupBy('housing_median_age').mean('median_house_value').orderBy(desc('housing_median_age')).show(10)

+------------------+-----------------------+
|housing_median_age|avg(median_house_value)|
+------------------+-----------------------+
|              52.0|      277368.2005703422|
|              51.0|             244850.125|
|              50.0|     222513.48214285713|
|              49.0|      220667.6126126126|
|              48.0|     224665.25185185185|
|              47.0|     190064.61142857143|
|              46.0|     198036.26020408163|
|              45.0|     223464.74468085106|
|              44.0|     214909.50675675675|
|              43.0|      194365.7797202797|
+------------------+-----------------------+
only showing top 10 rows



In [10]:
# feature columns
XCols = df.columns
del(XCols[-1])
print(XCols)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']


In [12]:
# target column
targetCol = df.columns[-1]
print(targetCol)

median_house_value


In [13]:
# df to vector (in pyspark must be used vectorized features)
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=XCols, outputCol='feature_vector')
df = va.transform(df)

In [14]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|      feature_vector|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|[-114.30999755859...|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|[-114.47000122070...|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|[-114.55999755859...|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|[-114.56999969482...|

In [15]:
 # split train and test sets
 trainSet, testSet = df.randomSplit([0.8, 0.2], seed=42)

In [16]:
# import LR from pyspark and cnstruct model
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol = 'feature_vector', labelCol = targetCol)

In [17]:
# fit(train) model
model = model.fit(trainSet)

In [18]:
# show coefficients of features
model.coefficients

DenseVector([-42897.9984, -42705.3007, 1144.0811, -8.2234, 119.8248, -37.871, 40.8358, 40399.1481])

In [19]:
# using evalute for predict and calculate model evaluators
pred = model.evaluate(testSet)

In [20]:
# show r2 score
pred.r2

0.6420168570584976

In [21]:
# show MAE
pred.meanAbsoluteError

50573.11348422675

In [29]:
# import random forest regressor from pyspark and cnstruct model
from pyspark.ml.regression import RandomForestRegressor
model = RandomForestRegressor(featuresCol = 'feature_vector', labelCol = targetCol)

In [30]:
# fit(train) model
model = model.fit(trainSet)

In [31]:
# predict model with test set
pred = model.transform(testSet)

In [32]:
pred.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|      feature_vector|        prediction|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------------------+------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|[-124.30000305175...|120656.61656236497|
|  -124.23|   40.54|              52.0|     2694.0|         453.0|    1152.0|     435.0|       3.0806|          106700.0|[-124.23000335693...|136779.76158734277|
|  -124.23|   41.75|              11.0|     3159.0|         616.0|    1343.0|     479.0|       2.4805|           73200.0|[-124.23000335693...|106764.91552746261|
|  -124.19|   40.73|        

In [33]:
# evalute with pyspark RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol=targetCol, predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(pred)
print('R2-Score : {}'.format(r2))

R2-Score : 0.5980584401661394
