# PySpark Basics

In [75]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("demo").config("spark.ui.port", "4050").getOrCreate()

In [76]:
spark

In [77]:
df = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [78]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [79]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id", monotonically_increasing_id())
df = df[["id"] + df.columns[:-1]]
df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [80]:
df.count()

20640

## Aggregate Functions

In [81]:
df.select("total_rooms").agg({"total_rooms": "avg"}).show()     # Can pass a function instead of "avg" also.

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [82]:
from pyspark.sql.functions import mean

df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [83]:
df.groupBy("ocean_proximity").agg({col: 'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

### User Defined Functions

In [84]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
    return value*value

squared_udf = udf(squared, FloatType())

In [85]:
df.withColumn("total_rooms_squared", squared_udf("total_rooms")).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

# MLlib with PySpark

#### Train-Test Split

In [86]:
train, test = df.randomSplit([0.7, 0.3])

train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [87]:
# Get list of numerical features.
numerical_features = train.columns
numerical_features.remove("median_house_value")
numerical_features.remove("id")
numerical_features.remove("ocean_proximity")

numerical_features

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [88]:
# Dealing with missing values by imputing values.
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features, outputCols=numerical_features) # Can specify strategy as well.
imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
|  5| 

In [89]:
# Vectorize the data.
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features, outputCol="numerical_feature_vector")
train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(2) # Notice the new column which has been added at the end.

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------

In [90]:
train.select("numerical_feature_vector").take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014]))]

In [91]:
# Normalization of data.
from pyspark.ml.feature import StandardScaler


scaler = StandardScaler(inputCol="numerical_feature_vector", outputCol="scaled_numerical_feature_vector", withStd=True, withMean=True)
scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3304769372351...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3254711558963...|
+---+

In [92]:
train.select("scaled_numerical_feature_vector").take(2)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3305, 1.056, 0.9814, -0.7946, -0.9634, -0.9513, -0.9666, 2.2985])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3255, 1.0466, -0.6062, 1.9866, 1.3213, 0.8295, 1.6337, 2.2862]))]

In [93]:
# Now, we will deal with the categorical column.
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_category_index")
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3304769372351...|                 3.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          3585

In [94]:
# Avoid this, not a good practice for huge datasets.
set(train.select("ocean_category_index").collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [95]:
# One-hot encoding, this is implemented differently from the usual OHE in ML.
from pyspark.ml.feature import OneHotEncoder

ohe = OneHotEncoder(inputCol="ocean_category_index", outputCol="ocean_category_one_hot")
ohe = ohe.fit(train)
train = ohe.transform(train)
test = ohe.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3304769372351...|                 3.0|         (4,[3],[1.0])|
|  1|  -122.22|   37.86|    

In [96]:
# Concatenating the numerical features with the categorical feature.
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector', 'ocean_category_one_hot'], outputCol="final_feature_vector")

train = assembler.transform(train)
test = assembler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3304769372351...|          

In [97]:
train.select("final_feature_vector").take(2)

[Row(final_feature_vector=DenseVector([-1.3305, 1.056, 0.9814, -0.7946, -0.9634, -0.9513, -0.9666, 2.2985, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3255, 1.0466, -0.6062, 1.9866, 1.3213, 0.8295, 1.6337, 2.2862, 0.0, 0.0, 0.0, 1.0]))]

In [98]:
# Now, onto the ML model.
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="final_feature_vector", labelCol="median_house_value")
lr

LinearRegression_d7c57865e990

In [99]:
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_d7c57865e990, numFeatures=12

In [100]:
pred_train_df = lr.transform(train).withColumnRenamed("prediction", "predicted_median_house_value")
pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

In [101]:
pred_test_df = lr.transform(test).withColumnRenamed("prediction", "predicted_median_house_value")
pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          35210

In [102]:
pred_test_pd_df = pred_test_df.toPandas()
pred_test_pd_df.head()

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_one_hot,final_feature_vector,predicted_median_house_value
0,2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY,"[-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 1...","[-1.3354827185740068, 1.0419185581994275, 1.85...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3354827185740068, 1.0419185581994275, 1.85...",377392.913926
1,6,-122.25,37.84,52.0,2535.0,489.0,1094.0,514.0,3.6591,299200.0,NEAR BAY,"[-122.25, 37.84, 52.0, 2535.0, 489.0, 1094.0, ...","[-1.3404884999128401, 1.037217234657352, 1.854...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3404884999128401, 1.037217234657352, 1.854...",257722.586149
2,13,-122.26,37.84,52.0,696.0,191.0,345.0,174.0,2.6736,191300.0,NEAR BAY,"[-122.26, 37.84, 52.0, 696.0, 191.0, 345.0, 17...","[-1.3454942812516735, 1.037217234657352, 1.854...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3454942812516735, 1.037217234657352, 1.854...",210832.900436
3,14,-122.26,37.85,52.0,2643.0,626.0,1212.0,620.0,1.9167,159200.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2643.0, 626.0, 1212.0, ...","[-1.3454942812516735, 1.0419185581994275, 1.85...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3454942812516735, 1.0419185581994275, 1.85...",203741.19373
4,19,-122.27,37.84,52.0,1503.0,298.0,690.0,275.0,2.6033,162900.0,NEAR BAY,"[-122.27, 37.84, 52.0, 1503.0, 298.0, 690.0, 2...","[-1.3505000625904997, 1.037217234657352, 1.854...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3505000625904997, 1.037217234657352, 1.854...",206673.671296


In [103]:
predictions_and_actuals = pred_test_df[["predicted_median_house_value", "median_house_value"]]

predictions_and_actuals_rdd = predictions_and_actuals.rdd
predictions_and_actuals_rdd.take(5)

[Row(predicted_median_house_value=377392.9139255514, median_house_value=352100.0),
 Row(predicted_median_house_value=257722.58614909495, median_house_value=299200.0),
 Row(predicted_median_house_value=210832.90043601894, median_house_value=191300.0),
 Row(predicted_median_house_value=203741.19372972005, median_house_value=159200.0),
 Row(predicted_median_house_value=206673.6712963723, median_house_value=162900.0)]

In [104]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)
predictions_and_actuals_rdd.take(5)

[(377392.9139255514, 352100.0),
 (257722.58614909495, 299200.0),
 (210832.90043601894, 191300.0),
 (203741.19372972005, 159200.0),
 (206673.6712963723, 162900.0)]

In [107]:
# Evaluation
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
MSE:  {0}
RMSE: {1}
MAE:  {2}
R^2:  {3}
'''.format(metrics.meanSquaredError, metrics.rootMeanSquaredError, metrics.meanAbsoluteError, metrics.r2)



In [109]:
print(s)


MSE:  4529385289.025399
RMSE: 67300.70793851577
MAE:  49188.14363205681
R^2:  0.6501701388902779

