<a href="https://colab.research.google.com/github/bala-baskar/aiml_tech_courses/blob/main/pyspark/00_pyspark_basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=9aeddf001294e82b4503cc6a44276f2188eaab50658a2256fd2fa6814cf1c3aa
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


### Initiate Spark session in Google colab

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [3]:
# create the session
spark = SparkSession.builder\
                    .master("local")\
                    .appName("Colab")\
                    .config('spark.ui.port',"4050")\
                    .getOrCreate()
spark

In [4]:
# !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# !unzip ngrok-stable-linux-amd64.zip
# get_ipython().system_raw('./ngrok http 4050 &')
# !curl -s http://localhost:4040/api/tunnels
# !sleep 5
# !curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

In [5]:
# Prepare data
sample_df = pd.DataFrame({'name':['abc','bcd','cde','def','efg','fgh','ghi',None],
                          'age':[35,30,24,21,None,50,None,None],
                          'gender':['M','F','F','F','M','M','M','F'],
                          'exp_yrs':[5.2,10.8,12.4,7.3,None,2.3,None,1.5],
                          'salary':[1200,2400,None,2000,5000,800,1900,None]})
sample_df.to_csv("sample.csv",index=False)

###Read the file

In [6]:
spark_df = spark.read.format('csv').option('header','true').load("sample.csv",inferSchema=True)
spark_df.show()

+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| abc|35.0|     M|    5.2|1200.0|
| bcd|30.0|     F|   10.8|2400.0|
| cde|24.0|     F|   12.4|  NULL|
| def|21.0|     F|    7.3|2000.0|
| efg|NULL|     M|   NULL|5000.0|
| fgh|50.0|     M|    2.3| 800.0|
| ghi|NULL|     M|   NULL|1900.0|
|NULL|NULL|     F|    1.5|  NULL|
+----+----+------+-------+------+



In [7]:
type(spark_df), spark_df.printSchema(), spark_df.columns

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- exp_yrs: double (nullable = true)
 |-- salary: double (nullable = true)



(pyspark.sql.dataframe.DataFrame,
 None,
 ['name', 'age', 'gender', 'exp_yrs', 'salary'])

In [8]:
spark_df.head(2)

[Row(name='abc', age=35.0, gender='M', exp_yrs=5.2, salary=1200.0),
 Row(name='bcd', age=30.0, gender='F', exp_yrs=10.8, salary=2400.0)]

In [9]:
display(spark_df)

DataFrame[name: string, age: double, gender: string, exp_yrs: double, salary: double]

###Data Manipulation

In [10]:
# Select a column in dataframe
spark_df.select(['Name','exp_yrs']).show()

+----+-------+
|Name|exp_yrs|
+----+-------+
| abc|    5.2|
| bcd|   10.8|
| cde|   12.4|
| def|    7.3|
| efg|   NULL|
| fgh|    2.3|
| ghi|   NULL|
|NULL|    1.5|
+----+-------+



In [11]:
# Find the dataypes
spark_df.dtypes

[('name', 'string'),
 ('age', 'double'),
 ('gender', 'string'),
 ('exp_yrs', 'double'),
 ('salary', 'double')]

In [12]:
# Describe summary
spark_df.describe().show()

+-------+----+------------------+------+-----------------+------------------+
|summary|name|               age|gender|          exp_yrs|            salary|
+-------+----+------------------+------+-----------------+------------------+
|  count|   7|                 5|     8|                6|                 6|
|   mean|NULL|              32.0|  NULL|6.583333333333332|2216.6666666666665|
| stddev|NULL|11.423659658795863|  NULL|4.432343247839304|1481.1031924436147|
|    min| abc|              21.0|     F|              1.5|             800.0|
|    max| ghi|              50.0|     M|             12.4|            5000.0|
+-------+----+------------------+------+-----------------+------------------+



In [13]:
# Adding new column & rename
spark_df = spark_df.withColumn('exp_workers', spark_df['exp_yrs']>10.0).withColumnRenamed('exp_workers','Exp_>_10yrs')
spark_df.show()

+----+----+------+-------+------+-----------+
|name| age|gender|exp_yrs|salary|Exp_>_10yrs|
+----+----+------+-------+------+-----------+
| abc|35.0|     M|    5.2|1200.0|      false|
| bcd|30.0|     F|   10.8|2400.0|       true|
| cde|24.0|     F|   12.4|  NULL|       true|
| def|21.0|     F|    7.3|2000.0|      false|
| efg|NULL|     M|   NULL|5000.0|       NULL|
| fgh|50.0|     M|    2.3| 800.0|      false|
| ghi|NULL|     M|   NULL|1900.0|       NULL|
|NULL|NULL|     F|    1.5|  NULL|      false|
+----+----+------+-------+------+-----------+



In [14]:
# Drop the existing column
spark_df = spark_df.drop('Exp_>_10yrs')
spark_df.show()

+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| abc|35.0|     M|    5.2|1200.0|
| bcd|30.0|     F|   10.8|2400.0|
| cde|24.0|     F|   12.4|  NULL|
| def|21.0|     F|    7.3|2000.0|
| efg|NULL|     M|   NULL|5000.0|
| fgh|50.0|     M|    2.3| 800.0|
| ghi|NULL|     M|   NULL|1900.0|
|NULL|NULL|     F|    1.5|  NULL|
+----+----+------+-------+------+



In [15]:
# Drop na rows
spark_df.show()
print("Drop any rows with NA")
spark_df.dropna().show()
print("Drop any rows with min thresh values as Non null")
spark_df.dropna(how='any',thresh=2).show()
print("Drop any rows with NA in specific cols",['name','salary'])
spark_df.dropna(how='any',subset=['name','salary']).show()

+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| abc|35.0|     M|    5.2|1200.0|
| bcd|30.0|     F|   10.8|2400.0|
| cde|24.0|     F|   12.4|  NULL|
| def|21.0|     F|    7.3|2000.0|
| efg|NULL|     M|   NULL|5000.0|
| fgh|50.0|     M|    2.3| 800.0|
| ghi|NULL|     M|   NULL|1900.0|
|NULL|NULL|     F|    1.5|  NULL|
+----+----+------+-------+------+

Drop any rows with NA
+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| abc|35.0|     M|    5.2|1200.0|
| bcd|30.0|     F|   10.8|2400.0|
| def|21.0|     F|    7.3|2000.0|
| fgh|50.0|     M|    2.3| 800.0|
+----+----+------+-------+------+

Drop any rows with min thresh values as Non null
+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| abc|35.0|     M|    5.2|1200.0|
| bcd|30.0|     F|   10.8|2400.0|
| cde|24.0|     F|   12.4|  NULL|
| def|21.0|     F|    7.3|2000.0|
| efg|NUL

In [16]:
# Data imputer
from pyspark.ml.feature import Imputer

imputer = Imputer(
          inputCols=['age','exp_yrs','salary'],
          outputCols=["{}_imputed".format(c) for c in ['age','exp_yrs','salary']]
).setStrategy("mean")

imputer.fit(spark_df).transform(spark_df).show()

+----+----+------+-------+------+-----------+-----------------+------------------+
|name| age|gender|exp_yrs|salary|age_imputed|  exp_yrs_imputed|    salary_imputed|
+----+----+------+-------+------+-----------+-----------------+------------------+
| abc|35.0|     M|    5.2|1200.0|       35.0|              5.2|            1200.0|
| bcd|30.0|     F|   10.8|2400.0|       30.0|             10.8|            2400.0|
| cde|24.0|     F|   12.4|  NULL|       24.0|             12.4|2216.6666666666665|
| def|21.0|     F|    7.3|2000.0|       21.0|              7.3|            2000.0|
| efg|NULL|     M|   NULL|5000.0|       32.0|6.583333333333332|            5000.0|
| fgh|50.0|     M|    2.3| 800.0|       50.0|              2.3|             800.0|
| ghi|NULL|     M|   NULL|1900.0|       32.0|6.583333333333332|            1900.0|
|NULL|NULL|     F|    1.5|  NULL|       32.0|              1.5|2216.6666666666665|
+----+----+------+-------+------+-----------+-----------------+------------------+



In [17]:
# Filter operation
spark_df.filter("salary > 1200").dropna(how='any').show()

+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| bcd|30.0|     F|   10.8|2400.0|
| def|21.0|     F|    7.3|2000.0|
+----+----+------+-------+------+



In [18]:
spark_df.filter((spark_df['salary']>500) & (~(spark_df['age']<30.0))).show()

+----+----+------+-------+------+
|name| age|gender|exp_yrs|salary|
+----+----+------+-------+------+
| abc|35.0|     M|    5.2|1200.0|
| bcd|30.0|     F|   10.8|2400.0|
| fgh|50.0|     M|    2.3| 800.0|
+----+----+------+-------+------+



In [19]:
# Groupby & Agg function
spark_df.groupBy('gender').mean().show()

+------+--------+------------+-----------+
|gender|avg(age)|avg(exp_yrs)|avg(salary)|
+------+--------+------------+-----------+
|     F|    25.0|         8.0|     2200.0|
|     M|    42.5|        3.75|     2225.0|
+------+--------+------------+-----------+



###Machine learning using PySpark

####1. Data preparation

In [85]:
from sklearn.datasets import fetch_california_housing

data = fetch_california_housing()
data['data']

array([[   8.3252    ,   41.        ,    6.98412698, ...,    2.55555556,
          37.88      , -122.23      ],
       [   8.3014    ,   21.        ,    6.23813708, ...,    2.10984183,
          37.86      , -122.22      ],
       [   7.2574    ,   52.        ,    8.28813559, ...,    2.80225989,
          37.85      , -122.24      ],
       ...,
       [   1.7       ,   17.        ,    5.20554273, ...,    2.3256351 ,
          39.43      , -121.22      ],
       [   1.8672    ,   18.        ,    5.32951289, ...,    2.12320917,
          39.43      , -121.32      ],
       [   2.3886    ,   16.        ,    5.25471698, ...,    2.61698113,
          39.37      , -121.24      ]])

In [86]:
df = pd.DataFrame(data['data'],columns=data['feature_names'])
df[data['target_names']] = data['target'].reshape(-1,1)
df

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedHouseVal
0,8.3252,41.0,6.984127,1.023810,322.0,2.555556,37.88,-122.23,4.526
1,8.3014,21.0,6.238137,0.971880,2401.0,2.109842,37.86,-122.22,3.585
2,7.2574,52.0,8.288136,1.073446,496.0,2.802260,37.85,-122.24,3.521
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25,3.413
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25,3.422
...,...,...,...,...,...,...,...,...,...
20635,1.5603,25.0,5.045455,1.133333,845.0,2.560606,39.48,-121.09,0.781
20636,2.5568,18.0,6.114035,1.315789,356.0,3.122807,39.49,-121.21,0.771
20637,1.7000,17.0,5.205543,1.120092,1007.0,2.325635,39.43,-121.22,0.923
20638,1.8672,18.0,5.329513,1.171920,741.0,2.123209,39.43,-121.32,0.847


In [87]:
df['Population_rank'] = pd.cut(df['Population'],bins=3,labels=[f"pop_rank_{x+1}" for x in range(3)])
df['Old_house'] = np.where(df['HouseAge'] < 5, 'New','Old')
df.to_csv("california_housing_price.csv",index=False)
df.head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedHouseVal,Population_rank,Old_house
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23,4.526,pop_rank_1,Old
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22,3.585,pop_rank_1,Old
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24,3.521,pop_rank_1,Old
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25,3.413,pop_rank_1,Old
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25,3.422,pop_rank_1,Old


####2. Data Load

In [88]:
data_df = spark.read.format('csv').option('header','true').option('inferSchema','true').load('california_housing_price.csv')
data_df.show()

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----------+---------------+---------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----------+---------------+---------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|      4.526|     pop_rank_1|      Old|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|      3.585|     pop_rank_1|      Old|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|      3.521|     pop_rank_1|      Old|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|      3.413|     pop_rank_1|      Old|

In [89]:
data_df.printSchema()

root
 |-- MedInc: double (nullable = true)
 |-- HouseAge: double (nullable = true)
 |-- AveRooms: double (nullable = true)
 |-- AveBedrms: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- AveOccup: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- MedHouseVal: double (nullable = true)
 |-- Population_rank: string (nullable = true)
 |-- Old_house: string (nullable = true)



In [90]:
data_df.select(*[round(mean(c),3) for c in data_df.columns]).show()

+---------------------+-----------------------+-----------------------+------------------------+-------------------------+-----------------------+-----------------------+------------------------+--------------------------+------------------------------+------------------------+
|round(avg(MedInc), 3)|round(avg(HouseAge), 3)|round(avg(AveRooms), 3)|round(avg(AveBedrms), 3)|round(avg(Population), 3)|round(avg(AveOccup), 3)|round(avg(Latitude), 3)|round(avg(Longitude), 3)|round(avg(MedHouseVal), 3)|round(avg(Population_rank), 3)|round(avg(Old_house), 3)|
+---------------------+-----------------------+-----------------------+------------------------+-------------------------+-----------------------+-----------------------+------------------------+--------------------------+------------------------------+------------------------+
|                3.871|                 28.639|                  5.429|                   1.097|                 1425.477|                  3.071|                 

In [91]:
data_df.groupby('Population_rank').agg({col: 'avg' for col in data_df.columns}).show()

+---------------+-----------------+-----------------+------------------+-------------------+------------------+--------------+-----------------+-----------------+------------------+--------------------+------------------+
|Population_rank|    avg(HouseAge)|    avg(AveOccup)|     avg(Latitude)|     avg(Longitude)|       avg(MedInc)|avg(Old_house)|  avg(Population)|    avg(AveRooms)|    avg(AveBedrms)|avg(Population_rank)|  avg(MedHouseVal)|
+---------------+-----------------+-----------------+------------------+-------------------+------------------+--------------+-----------------+-----------------+------------------+--------------------+------------------+
|     pop_rank_3|             12.5|6.089440986465188|34.995000000000005|           -119.605|2.4408000000000003|          NULL|          32124.0|5.317507605052884|1.0350843463233206|                NULL|             1.266|
|     pop_rank_2|8.333333333333334|3.221390355585077| 35.36083333333333| -119.4541666666667|          5.605025| 

In [92]:
# Using UDF in pyspark

avg_lat = data_df.select(mean(col('Latitude'))).collect()[0][0]
avg_long = data_df.select(mean(col('Longitude'))).collect()[0][0]
print(avg_lat,avg_long)

35.6318614341087 -119.56970445736148


In [93]:
def spatial_distance(lat,long):
  return np.power(np.power(35.6318614341087-lat,2)+np.power(-119.56970445736148-long,2),0.5)

spatial_distance(35.0975,-118.7075)

1.014366141270855

In [94]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def spatial_distance(lat,long):
  return ((avg_lat - lat)**2 + (avg_long - long)**2) ** 0.5

distance_udf = udf(spatial_distance, FloatType())

data_df = data_df.withColumn('avg_distance',distance_udf('Latitude','Longitude'))
data_df.show()

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|      4.526|     pop_rank_1|      Old|   3.4830015|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|      3.585|     pop_rank_1|      Old|   3.4624655|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|      3.521|     pop_rank_1|      Old|   3.4713998|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.

In [95]:
# Train - test split
train, test = data_df.randomSplit([0.7,0.3])
train.count(),test.count()

(14448, 6192)

In [96]:
# Get only numerical features
target = 'MedHouseVal'
num_features_lst = [c for c,d in train.dtypes if (d != 'string') and (c != target)]
num_features_lst

['MedInc',
 'HouseAge',
 'AveRooms',
 'AveBedrms',
 'Population',
 'AveOccup',
 'Latitude',
 'Longitude',
 'avg_distance']

In [97]:
# Impute the numerical features

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=num_features_lst,
                  outputCols=num_features_lst)

imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)

In [98]:
# Create feature vector by combining columns
from pyspark.ml.feature import VectorAssembler

num_vector_assembler = VectorAssembler(inputCols=num_features_lst,
                                       outputCol='num_features_vector')

train = num_vector_assembler.transform(train)
test = num_vector_assembler.transform(test)

train.show(3)

+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+
|MedInc|HouseAge|          AveRooms|       AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance| num_features_vector|
+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+
|0.4999|    10.0|              6.74|            2.04|     108.0|              2.16|   34.69|   -116.9|       0.55|     pop_rank_1|      Old|    2.830976|[0.4999,10.0,6.74...|
|0.4999|    15.0|11.596491228070175|2.56140350877193|     131.0|2.2982456140350878|   40.43|  -123.32|      0.567|     pop_rank_1|      Old|   6.0898976|[0.4999,15.0,11.5...|
|0.4999|    16.0| 21.63157894736842|             6.0|      26.0| 1.368421052631579|   39.42|  -122.89|      0.735|     pop_ra

In [99]:
# Scaling the numerical features
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='num_features_vector',
                        outputCol='scaled_num_feature_vector',
                        withStd=True,withMean=True)

scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)

train.show(3)

+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+
|MedInc|HouseAge|          AveRooms|       AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance| num_features_vector|scaled_num_feature_vector|
+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+
|0.4999|    10.0|              6.74|            2.04|     108.0|              2.16|   34.69|   -116.9|       0.55|     pop_rank_1|      Old|    2.830976|[0.4999,10.0,6.74...|     [-1.7714414490315...|
|0.4999|    15.0|11.596491228070175|2.56140350877193|     131.0|2.2982456140350878|   40.43|  -123.32|      0.567|     pop_rank_1|      Old|   6.0898976|[0.4999,15.0,11.5...|     [-1.7714414490315

In [100]:
# label encoding the categorical features
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=['Population_rank','Old_house'],
                        outputCols=['enc_Population_rank','enc_old_house'])

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)

+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+
|MedInc|HouseAge|          AveRooms|       AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance| num_features_vector|scaled_num_feature_vector|enc_Population_rank|enc_old_house|
+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+
|0.4999|    10.0|              6.74|            2.04|     108.0|              2.16|   34.69|   -116.9|       0.55|     pop_rank_1|      Old|    2.830976|[0.4999,10.0,6.74...|     [-1.7714414490315...|                0.0|          0.0|
|0.4999|    15.0|11.596491228070175|2.56140350877193|     13

In [101]:
train.groupby('enc_Population_rank').count().show(),train.groupby('enc_old_house').count().show()

+-------------------+-----+
|enc_Population_rank|count|
+-------------------+-----+
|                0.0|14437|
|                1.0|   10|
|                2.0|    1|
+-------------------+-----+

+-------------+-----+
|enc_old_house|count|
+-------------+-----+
|          0.0|14226|
|          1.0|  222|
+-------------+-----+



(None, None)

In [102]:
# One hot encoding

from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCols=['enc_Population_rank','enc_old_house'],
                                outputCols=['oh_enc_Population_rank','oh_enc_old_house'])

one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)
train.show(3)

+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+----------------------+----------------+
|MedInc|HouseAge|          AveRooms|       AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance| num_features_vector|scaled_num_feature_vector|enc_Population_rank|enc_old_house|oh_enc_Population_rank|oh_enc_old_house|
+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+----------------------+----------------+
|0.4999|    10.0|              6.74|            2.04|     108.0|              2.16|   34.69|   -116.9|       0.55|     pop_rank_1|      Old|    2.830976|[0.4999,10.0,6.74...| 

In [104]:
assembler = VectorAssembler(inputCols=['scaled_num_feature_vector','oh_enc_Population_rank','oh_enc_old_house'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)
train.show(3)

+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+----------------------+----------------+--------------------+
|MedInc|HouseAge|          AveRooms|       AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance| num_features_vector|scaled_num_feature_vector|enc_Population_rank|enc_old_house|oh_enc_Population_rank|oh_enc_old_house|final_feature_vector|
+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+----------------------+----------------+--------------------+
|0.4999|    10.0|              6.74|            2.04|     108.0|              2.16|   34.69|   -116.9|       0.5

In [106]:
# Build a model
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol='MedHouseVal')
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_6da68a219e60, numFeatures=12

In [109]:
lr.coefficients

DenseVector([0.8506, 0.1265, -0.2936, 0.3381, -0.0013, -0.0459, -0.9329, -0.8859, 0.0452, 0.4757, 0.4705, -0.0004])

In [107]:
lr.transform(train).show(3)

+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+----------------------+----------------+--------------------+------------------+
|MedInc|HouseAge|          AveRooms|       AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|Population_rank|Old_house|avg_distance| num_features_vector|scaled_num_feature_vector|enc_Population_rank|enc_old_house|oh_enc_Population_rank|oh_enc_old_house|final_feature_vector|        prediction|
+------+--------+------------------+----------------+----------+------------------+--------+---------+-----------+---------------+---------+------------+--------------------+-------------------------+-------------------+-------------+----------------------+----------------+--------------------+------------------+
|0.4999|    10.0|              6.74|            2.04|  

In [108]:
pred_train_df = lr.transform(test)
pred_train_df.select(['MedHouseVal','prediction']).show(3)

+-----------+--------------------+
|MedHouseVal|          prediction|
+-----------+--------------------+
|      1.625|-0.08941055149575794|
|      0.675|  1.0660534163765392|
|      0.425|  0.5157651766915499|
+-----------+--------------------+
only showing top 3 rows



In [110]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="MedHouseVal",
                                predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(pred_train_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.735385


In [112]:
# Evaluation using RDD
pred_actuals_rdd = pred_train_df.select(['MedHouseVal','prediction']).rdd.map(tuple)

pred_actuals_rdd.take(2)

[(1.625, -0.08941055149575794), (0.675, 1.0660534163765392)]

In [115]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics= RegressionMetrics(pred_actuals_rdd)

s = '''
Mean Squared error      :   {0}
Root Mean Squared Error :   {1}
Mean Absolute Error     :   {2}
R^2                     :   {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2)
print(s)




Mean Squared error      :   0.5407915074305492
Root Mean Squared Error :   0.7353852782253322
Mean Absolute Error     :   0.5306816593739203
R^2                     :   0.3351978583238028

