# Machine learning in Spark

In [1]:
!ls

[34mCaliforniaHousing[m[m cal_housing.tgz   spark_book.ipynb


In [2]:
import findspark

# initialise findspark and provide path to locally installed spark folder
# This adds pyspark to sys.path at runtime
findspark.init('/usr/local/spark')

In [3]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [4]:
# Show java virtual machines running
!jps

70904 SparkSubmit
70909 Jps


In [5]:
# Load in the data
rdd = sc.textFile('CaliforniaHousing/cal_housing.data')

# Load in the header
header = sc.textFile('CaliforniaHousing/cal_housing.domain')

In [6]:
# The collect() method brings the entire RDD to a single machine
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [7]:
# take() is like collect() but safer because it limits the results
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [8]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

In [9]:
# Take top elements
rdd.top(2)

[['-124.350000',
  '40.540000',
  '52.000000',
  '1820.000000',
  '300.000000',
  '806.000000',
  '270.000000',
  '3.014700',
  '94600.000000'],
 ['-124.300000',
  '41.840000',
  '17.000000',
  '2677.000000',
  '531.000000',
  '1244.000000',
  '456.000000',
  '3.031300',
  '103600.000000']]

In [10]:
# Inspect the first line 
rdd.first()

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [11]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [12]:
type(df)

pyspark.sql.dataframe.DataFrame

In [13]:
type(rdd)

pyspark.rdd.PipelinedRDD

In [14]:
type(header)

pyspark.rdd.RDD

In [15]:
# Show the top 20 rows 
df.show()

+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|  longitude| latitude|housingMedianAge| totalRooms|totalBedRooms| population| households|medianIncome|medianHouseValue|
+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|-122.230000|37.880000|       41.000000| 880.000000|   129.000000| 322.000000| 126.000000|    8.325200|   452600.000000|
|-122.220000|37.860000|       21.000000|7099.000000|  1106.000000|2401.000000|1138.000000|    8.301400|   358500.000000|
|-122.240000|37.850000|       52.000000|1467.000000|   190.000000| 496.000000| 177.000000|    7.257400|   352100.000000|
|-122.250000|37.850000|       52.000000|1274.000000|   235.000000| 558.000000| 219.000000|    5.643100|   341300.000000|
|-122.250000|37.850000|       52.000000|1627.000000|   280.000000| 565.000000| 259.000000|    3.846200|   342200.000000|
|-122.250000|37.850000|       52

In [16]:
# Show the top row 
df.head()

Row(longitude='-122.230000', latitude='37.880000', housingMedianAge='41.000000', totalRooms='880.000000', totalBedRooms='129.000000', population='322.000000', households='126.000000', medianIncome='8.325200', medianHouseValue='452600.000000')

In [17]:
# use df.columns to return the columns of your DataFrame.
df.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedRooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue']

In [18]:
# Print the data types of all `df` columns
df.dtypes

[('longitude', 'string'),
 ('latitude', 'string'),
 ('housingMedianAge', 'string'),
 ('totalRooms', 'string'),
 ('totalBedRooms', 'string'),
 ('population', 'string'),
 ('households', 'string'),
 ('medianIncome', 'string'),
 ('medianHouseValue', 'string')]

In [19]:
# Print the schema of `df`
df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- totalRooms: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)



In [20]:
from pyspark.sql.types import *

#declare that each column of the DataFrame df should be cast to a FloatType()
df = df.withColumn("longitude", df["longitude"].cast(FloatType())).\
withColumn("latitude", df["latitude"].cast(FloatType())). \
withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())). \
withColumn("totalRooms", df["totalRooms"].cast(FloatType())). \
withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())). \
withColumn("population", df["population"].cast(FloatType())). \
withColumn("households", df["households"].cast(FloatType())). \
withColumn("medianIncome", df["medianIncome"].cast(FloatType())). \
withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [21]:
# but this approach is better
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [22]:
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)



### Data exploration

In [23]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [24]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [25]:
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|         latitude|  housingMedianAge|        totalRooms|    totalBedRooms|        population|       households|      medianIncome|  medianHouseValue|
+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              20640|            20640|             20640|             20640|            20640|             20640|            20640|             20640|             20640|
|   mean|-119.56970444871473|35.63186143109965|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465|499.5396802325581|3.8706710030346416|206855.81690891474|
| stddev|  2.003531742932898|2.135952380602968| 12.58555761211163|2181.6152515827944| 421.247905943133|  

### Pre-processing

In [26]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526),
 Row(longitude=-122.22000122070312, latitude=37.86000061035156, housingMedianAge=21.0, totalRooms=7099.0, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, medianHouseValue=3.585)]

### Feature engineering

In [27]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [31]:
df.select("roomsPerHousehold","populationPerHousehold","bedroomsPerRoom").show(10)

+------------------+----------------------+-------------------+
| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+------------------+----------------------+-------------------+
| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
| 6.238137082601054|     2.109841827768014|0.15579659106916466|
| 8.288135593220339|    2.8022598870056497|0.12951601908657123|
|5.8173515981735155|     2.547945205479452|0.18445839874411302|
| 6.281853281853282|    2.1814671814671813| 0.1720958819913952|
| 4.761658031088083|     2.139896373056995|0.23177366702937977|
|4.9319066147859925|    2.1284046692607004|0.19289940828402366|
| 4.797527047913447|    1.7882534775888717|0.22132731958762886|
| 4.294117647058823|     2.026890756302521| 0.2602739726027397|
| 4.970588235294118|     2.172268907563025| 0.1992110453648915|
+------------------+----------------------+-------------------+
only showing top 10 rows



In [32]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [33]:
df.show(10)

+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|        190.0|     496.0|     177.0|      7.2574| 8.288135593220339|    2.8022598870056497|0.12951601908657123|
|           3.413|        235.0|     558.0|     219.0|      5.6431|5.8173515981735155|     2.547945205479452|0.18445839874411302|
|           3.422|        280.0|     565.0|     259.0|      3.8462| 6.281853281853282|    

### Standardisation

In [34]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [36]:
df.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
+-----+--------------------+
only showing top 10 rows



In [37]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [38]:
scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949916...|
+-----+--------------------+--------------------+
only showing top 2 rows



# Train ML model (lin-reg) with MLlib

In [39]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [47]:
def print_shape(df):
    print((df.count(), len(df.columns)))

In [48]:
print_shape(train_data)

(16525, 3)


In [50]:
print_shape(test_data)

(4115, 3)


In [51]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [52]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.4542345782162025, 0.14999),
 (1.6495895629633672, 0.175),
 (1.482098957606102, 0.332),
 (1.6123448501897313, 0.346),
 (1.6541552239238302, 0.35)]

### Model evaluation

In [54]:
# Coefficients for the model
print(linearModel.coefficients)

# Intercept for the model
print(linearModel.intercept)

[0.0,0.0,0.0,0.27670678884943006,0.0,0.0,0.0]
0.9947076240544817


In [55]:
# Get the RMSE
print(linearModel.summary.rootMeanSquaredError)

# Get the R2
print(linearModel.summary.r2)

0.8794333071050418
0.4192463677898063


In [56]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x7fcd5bb1d9a0>>