In [145]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [146]:
spark

In [147]:
sc=spark.sparkContext

In [148]:
#Load the data
rdd=sc.textFile('CaliforniaHousing//cal_housing.data')

In [149]:
#load the header
header=sc.textFile('CaliforniaHousing//cal_housing.domain')

In [150]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [151]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [152]:
rdd=rdd.map(lambda line: line.split(",")) #split lines on commas

In [153]:
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

In [154]:
rdd.first()

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [155]:
rdd.top(2)

[['-124.350000',
  '40.540000',
  '52.000000',
  '1820.000000',
  '300.000000',
  '806.000000',
  '270.000000',
  '3.014700',
  '94600.000000'],
 ['-124.300000',
  '41.840000',
  '17.000000',
  '2677.000000',
  '531.000000',
  '1244.000000',
  '456.000000',
  '3.031300',
  '103600.000000']]

In [156]:
from pyspark.sql import Row
#RDD to a Df
df=rdd.map(lambda line: Row(longitude=line[0],
                           latitude=line[1],
                           housingMedianAge=line[2],
                           totalRooms=line[3],
                           totalBedRooms=line[4],
                           population=line[5],
                           households=line[6],
                           medianIncome=line[7],
                           medianHouseValue=line[8])).toDF()

In [157]:
df.show()

+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|  longitude| latitude|housingMedianAge| totalRooms|totalBedRooms| population| households|medianIncome|medianHouseValue|
+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|-122.230000|37.880000|       41.000000| 880.000000|   129.000000| 322.000000| 126.000000|    8.325200|   452600.000000|
|-122.220000|37.860000|       21.000000|7099.000000|  1106.000000|2401.000000|1138.000000|    8.301400|   358500.000000|
|-122.240000|37.850000|       52.000000|1467.000000|   190.000000| 496.000000| 177.000000|    7.257400|   352100.000000|
|-122.250000|37.850000|       52.000000|1274.000000|   235.000000| 558.000000| 219.000000|    5.643100|   341300.000000|
|-122.250000|37.850000|       52.000000|1627.000000|   280.000000| 565.000000| 259.000000|    3.846200|   342200.000000|
|-122.250000|37.850000|       52

In [158]:
df.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedRooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue']

In [159]:
df.dtypes

[('longitude', 'string'),
 ('latitude', 'string'),
 ('housingMedianAge', 'string'),
 ('totalRooms', 'string'),
 ('totalBedRooms', 'string'),
 ('population', 'string'),
 ('households', 'string'),
 ('medianIncome', 'string'),
 ('medianHouseValue', 'string')]

In [160]:
df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- totalRooms: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)



In [161]:
#change the dtype of the columns
from pyspark.sql.types import *

def convertCol(df,names,newType):
    for i in names:
        df=df.withColumn(i,df[i].cast(newType))
    return df

columns=df.columns
df=convertCol(df,columns,FloatType())

In [162]:
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)



In [163]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [164]:
df.groupBy('housingMedianAge').count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [165]:
df.describe().show()

+-------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|  housingMedianAge|        totalRooms|    totalBedRooms|        population|        households|      medianIncome|  medianHouseValue|
+-------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|              20640|             20640|             20640|             20640|            20640|             20640|             20640|             20640|             20640|
|   mean|-119.56970444871473| 35.63186143109965|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465| 499.5396802325581|3.8706710030346416|206855.81690891474|
| stddev| 2.0035317429328914|2.1359523806029554|12.585557612111613|2181.6152515827994|421.24790

In [166]:
from pyspark.sql import functions as F

In [167]:
df.select([F.count(F.when(F.isnan(c)| F.col(c).isNull(),c)).alias(c) for c in df.columns]).show() #no.of missing values

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedRooms|population|households|medianIncome|medianHouseValue|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|        0|       0|               0|         0|            0|         0|         0|           0|               0|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+



In [168]:
#preprocessing
from pyspark.sql.functions import *
df=df.withColumn("medianHouseValue",col("medianHouseValue")/100000) #adjust the values
df.take(2)

[Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526),
 Row(longitude=-122.22000122070312, latitude=37.86000061035156, housingMedianAge=21.0, totalRooms=7099.0, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, medianHouseValue=3.585)]

In [169]:
#feature engineering
roomsPerHousehold=df.select(col("totalRooms")/col("households"))
populationPerHousehold=df.select(col("population")/col("houseHolds"))
bedroomsPerRoom=df.select(col("totalBedRooms")/col("totalRooms"))
df=df.withColumn("roomsPerHousehold",col("totalRooms")/col("households")).withColumn("populationPerHousehold",col("population")/col("houseHolds")).withColumn("bedroomsPerRoom",col("totalBedRooms")/col("totalRooms"))
df.first()

Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [170]:
df.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedRooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue',
 'roomsPerHousehold',
 'populationPerHousehold',
 'bedroomsPerRoom']

In [171]:
len(df.columns)

12

In [172]:
#re-order the columns
df=df.select("medianHouseValue",
            "longitude",
            "latitude",
            "totalRooms",
            "housingMedianAge",
            "totalBedRooms",
            "population",
            "households",
            "medianIncome",
            "roomsPerHousehold",
            "populationPerHousehold",
            "bedroomsPerRoom")

In [173]:
df.show()

+----------------+---------+--------+----------+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|medianHouseValue|longitude|latitude|totalRooms|housingMedianAge|totalBedRooms|population|households|medianIncome| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+---------+--------+----------+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|           4.526|  -122.23|   37.88|     880.0|            41.0|        129.0|     322.0|     126.0|      8.3252| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|  -122.22|   37.86|    7099.0|            21.0|       1106.0|    2401.0|    1138.0|      8.3014| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|  -122.24|   37.85|    1467.0|            52.0|        190.0|     496.0|     177.0|      7.2574| 

In [174]:
from pyspark.ml.linalg import DenseVector
input_data=df.rdd.map(lambda x: (x[0],DenseVector(x[1:])))

In [175]:
df=spark.createDataFrame(input_data,["label","features"])

In [176]:
df.take(2)

[Row(label=4.526, features=DenseVector([-122.23, 37.88, 880.0, 41.0, 129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466])),
 Row(label=3.585, features=DenseVector([-122.22, 37.86, 7099.0, 21.0, 1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]))]

In [177]:
from pyspark.ml.feature import StandardScaler
standardScaler=StandardScaler(inputCol="features",outputCol="features_scaled")
scaler=standardScaler.fit(df)
scaled_df=scaler.transform(df)
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([-122.23, 37.88, 880.0, 41.0, 129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([-61.0073, 17.7345, 0.4034, 3.2577, 0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([-122.22, 37.86, 7099.0, 21.0, 1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([-61.0023, 17.7251, 3.254, 1.6686, 2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [178]:
#split the data into train and test sets
train_data, test_data =scaled_df.randomSplit([.8,.2],seed=1234)

### DecisionTree Regression Model

In [179]:
from pyspark.ml.regression import DecisionTreeRegressor

In [180]:
dtr=DecisionTreeRegressor().setFeaturesCol("features").setLabelCol("label")

In [181]:
#Train the model using our training data
model = dtr.fit(train_data)

In [182]:
# Now see if we can predict values in our test data.
# Generate predictions using our decision tree model for all features in our
# test dataframe:
fullPredictions = model.transform(test_data).cache()

In [183]:
# Extract the predictions and the "known" correct labels.
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
labels = fullPredictions.select("label").rdd.map(lambda x: x[0])

In [184]:
predictionAndLabel = predictions.zip(labels).collect()

In [185]:
predictionAndLabel[:10]

[(2.6231063698630153, 0.14999),
 (1.2669072536231885, 0.225),
 (1.2299937373737373, 0.388),
 (0.8883356511627906, 0.394),
 (1.2299937373737373, 0.396),
 (0.8883356511627906, 0.398),
 (0.8883356511627906, 0.4),
 (0.8883356511627906, 0.431),
 (0.8883356511627906, 0.44),
 (0.8883356511627906, 0.441)]

### Multi Linear Regression Model

In [186]:
from pyspark.ml.regression import LinearRegression
lr=LinearRegression(labelCol="label",maxIter=10,regParam=0.3,elasticNetParam=0.8)
linearModel=lr.fit(train_data)

In [187]:
predicted=linearModel.transform(test_data)
# Extract the predictions and the "known" correct labels.
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

In [188]:
predictionAndLabel = predictions.zip(labels).collect()

In [189]:
predictionAndLabel[:10]

[(2.153236851984058, 0.14999),
 (1.7326700927043543, 0.225),
 (1.528370308475635, 0.388),
 (1.4190595193689608, 0.394),
 (1.282878145066965, 0.396),
 (1.5209462515074308, 0.398),
 (1.457952674892077, 0.4),
 (1.3675344208194558, 0.431),
 (1.5854080289670625, 0.44),
 (1.3530741615595212, 0.441)]

In [190]:
#evaluations
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.277, 0.0, 0.0, 0.0])

In [191]:
linearModel.intercept

0.9916505016472443

In [192]:
linearModel.summary.rootMeanSquaredError

0.8765426709359576

In [193]:
linearModel.summary.r2

0.4195260943254927

In [195]:
sc.stop()