In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=398bdcd2a6b29c4a4867379fe6e3321d093c30215e8ca29ef01c5e3c68d5e571
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .master("local")\
    .appName("test")\
    .config('spark.ui.port', '4050')\
    .getOrCreate()

In [None]:
spark

In [None]:
# Reading the data frame
df = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)

In [None]:
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [None]:
# Adding an Index Column
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())
df = df[['id'] + df.columns[:-1]]

In [None]:
df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [None]:
# Splitting the data into training and testing data randomly in the ratio of 70-30
train, test = df.randomSplit([0.7, 0.3])

In [None]:
numerical_features_list = train.columns
numerical_features_list.remove('median_house_value')
numerical_features_list.remove('id')
numerical_features_list.remove('ocean_proximity')
numerical_features_list

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [None]:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=numerical_features_list, outputCols=numerical_features_list)
imputer = imputer.fit(train)

In [None]:
train = imputer.transform(train)
test = imputer.transform(test)

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=numerical_features_list,
                            outputCol = 'numerical_feature_vector')

In [None]:
train = assembler.transform(train)
test = assembler.transform(test)

In [None]:
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...

In [None]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='numerical_feature_vector', outputCol='scaled_numerical_feature_vector', withStd=True, withMean=True)

In [None]:
scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)

In [None]:
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3214958176684...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3165137045594...|
|  2|

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_index')
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

In [None]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector', 'ocean_proximity_index'], outputCol='final_feature_vector')

In [None]:
train = assembler.transform(train)
test = assembler.transform(test)

In [None]:
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_proximity_index|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3214958176684...|                  3.0|[-1.3214958176684...|
|  1|  -122.22|   37.86|        

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='final_feature_vector', labelCol='median_house_value')

In [None]:
lr

LinearRegression_b00e185f7efb

In [None]:
lr = lr.fit(train)

In [None]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                      'predicted_median_house_value')

pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_proximity_index|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.321495

In [None]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value')

pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_proximity_index|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+--------------------+----------------------------+
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|    [-122.25,37.85,52...|           [-1.331460

In [None]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value',
                                        'median_house_value']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=317481.9942782311, median_house_value=341300.0),
 Row(predicted_median_house_value=198476.62163782847, median_house_value=226700.0)]

In [None]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)

predictions_and_actuals_rdd.take(2)

[(317481.9942782311, 341300.0), (198476.62163782847, 226700.0)]

In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2
           )

print(s)




Mean Squared Error:      4959780125.756161
Root Mean Squared Error: 70425.70642710061
Mean Absolute Error:     50739.762247929735
R**2:                    0.6258456475815206

