# Using spark on housing dataset

## Dataset:

For this assignment, you’ll make use of the California Housing data set (http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html). This assignment is to implement the usage of pyspark and try MlLib to build a machine learning model.

We are going use spark using two different methods

1. Using spark MlLib library and using pyspark modules.
2. Using spark-sklearn library. Build a pipeline in sklearn and run using spark gridsearch.

## Option 1:
In option 1, we are going to use pyspark native implementation functionalities. Also use MlLib library for a machine learning model.

In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [8]:
import findspark
findspark.find()

'/usr/local/spark'

In [9]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .getOrCreate()
   
sc = spark.sparkContext

In [10]:
sc

In [11]:
# Load in the data
rdd = sc.textFile('file:////data/cadata.txt')

In [544]:
rdd.take(1)

['  4.5260000000000000e+005  8.3252000000000006e+000  4.1000000000000000e+001  8.8000000000000000e+002  1.2900000000000000e+002  3.2200000000000000e+002  1.2600000000000000e+002  3.7880000000000003e+001 -1.2223000000000000e+002']

In [545]:
#Create a RDD by splitting the values and convert it to float

rdd1 = rdd.map(lambda line: line.split("  ")[1:])\
.map(lambda line: (line[:7],line[7].split(' '))) \
.map(lambda num: [float(item) for sublist in num for item in sublist])#.take(10)

In [546]:
rdd1.take(10)

[[452600.0, 8.3252, 41.0, 880.0, 129.0, 322.0, 126.0, 37.88, -122.23],
 [358500.0, 8.3014, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 37.86, -122.22],
 [352100.0, 7.2574, 52.0, 1467.0, 190.0, 496.0, 177.0, 37.85, -122.24],
 [341300.0, 5.6431, 52.0, 1274.0, 235.0, 558.0, 219.0, 37.85, -122.25],
 [342200.0, 3.8462, 52.0, 1627.0, 280.0, 565.0, 259.0, 37.85, -122.25],
 [269700.0, 4.0368, 52.0, 919.0, 213.0, 413.0, 193.0, 37.85, -122.25],
 [299200.0, 3.6591, 52.0, 2535.0, 489.0, 1094.0, 514.0, 37.84, -122.25],
 [241400.0, 3.12, 52.0, 3104.0, 687.0, 1157.0, 647.0, 37.84, -122.25],
 [226700.0, 2.0804, 42.0, 2555.0, 665.0, 1206.0, 595.0, 37.84, -122.26],
 [261100.0, 3.6912, 52.0, 3549.0, 707.0, 1551.0, 714.0, 37.84, -122.25]]

In [547]:
# Map the RDD to a DF and create column names

df = rdd1.map(lambda line: Row(medianHouseValue=line[0], 
                              medianIncome=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              latitude=line[7],
                              longitude=line[8])).toDF()

In [548]:
df.show(5)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
+----------+----------------+--------+---------+----------------+------------+--

In [549]:
df.printSchema()

root
 |-- households: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- medianHouseValue: double (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- population: double (nullable = true)
 |-- totalBedRooms: double (nullable = true)
 |-- totalRooms: double (nullable = true)



Optional Part:
Here we are converting the dataframe columns to float. But it is already converted to float using previous commands.

In [550]:
#Optional part

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



### Exploratory Analysis

In [551]:
# Show 10 population and totalBedRooms
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [552]:
# Groupby housingMedianAge and get the top 20 count

df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [553]:
# Info of medianHouseValue
df.select('medianHouseValue').describe().show()

+-------+------------------+
|summary|  medianHouseValue|
+-------+------------------+
|  count|             20640|
|   mean|206855.81690891474|
| stddev|115395.61587441359|
|    min|           14999.0|
|    max|          500001.0|
+-------+------------------+



In [554]:
#Change the depended variable
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

In [555]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))


In [556]:
# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [557]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [558]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df1 = spark.createDataFrame(input_data, ["label", "features"])


In [559]:
df.show(2)

+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome|roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252|6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014|6.238137082601054|     2.109841827768014|0.15579659106916466|
+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
only showing top 2 rows



In [560]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df1)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df1)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [561]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [562]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [563]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()


In [564]:
# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:10]

[(1.1340115638008952, 0.14999),
 (1.4485018834650096, 0.14999),
 (1.5713396046425587, 0.14999),
 (1.7496542762527307, 0.283),
 (1.2438468929500472, 0.366),
 (1.277345544168797, 0.367),
 (1.5895709536087121, 0.375),
 (1.5270755381601813, 0.388),
 (1.6941214255677641, 0.4),
 (1.9389021305505283, 0.404)]

### Evaluation

In [565]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2796, 0.0, 0.0, 0.0])

In [566]:
# Intercept for the model
linearModel.intercept

0.9841344205626824

In [567]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

0.8765335684459216

In [568]:
# Get the R2
linearModel.summary.r2

0.42282227755911483

# Option 2:

In the second option, we will integrate sklearn and spark usig spark_sklearn package. This package provides grid search option which can be executed parallely in spark instance.

Steps:
    1. Create a sklearn pipeline
    2. Perfrom data cleaning and add new features
    3. Create a gridsearchcv object from spark_sklearn
    4. Fit a model using gridsearchcv, so the models will get executed in spark cluster
    

In [12]:
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split,GridSearchCV as GV
from sklearn.linear_model import LinearRegression
from spark_sklearn import GridSearchCV
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import GradientBoostingRegressor

In [14]:
# Load in the data
pd_df = pd.read_csv('./data/cadata.txt',sep=" ", header=None)

In [585]:
pd_df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17
0,,,452600.0,,8.3252,,41.0,,880.0,,129.0,,322.0,,126.0,,37.88,-122.23
1,,,358500.0,,8.3014,,21.0,,7099.0,,1106.0,,2401.0,,1138.0,,37.86,-122.22
2,,,352100.0,,7.2574,,52.0,,1467.0,,190.0,,496.0,,177.0,,37.85,-122.24
3,,,341300.0,,5.6431,,52.0,,1274.0,,235.0,,558.0,,219.0,,37.85,-122.25
4,,,342200.0,,3.8462,,52.0,,1627.0,,280.0,,565.0,,259.0,,37.85,-122.25


In [586]:
#Data Cleaning
pd_df.dropna(axis=1,inplace=True)

In [587]:
#Add the column names
pd_df.columns=['medianHouseValue', 'medianIncome', 'housingMedianAge', 'totalRooms', 'totalBedRooms', 'population','households', 'latitude', 'longitude']

In [588]:
pd_df["medianHouseValue"] = pd_df["medianHouseValue"]/100000

In [589]:
#Split train and test dataset
train_X,test_X,train_y,test_y = train_test_split(pd_df.drop('medianHouseValue',axis=1),
                                                pd_df[['medianHouseValue']],
                                                test_size =0.2)

In [590]:
#Create a class for item selector
class ItemSelector(BaseEstimator, TransformerMixin):
    """This class selects the features which are mentioned in the pipeline"""
    def __init__(self,cols):
        self.cols = cols
        
    def fit(self, x, y=None):
        #print("infit")
        return self

    def transform(self, X):
        #print("intransform")
        col_list = []
        for c in self.cols:
            col_list.append(X[:, c:c+1])

        return np.concatenate(col_list,axis=1)

In [591]:
#Function transformer object to add new features

def roomsperhousehold(X):
    """This function is used in pipeline. """
    X['roomsPerHousehold'] = X['totalRooms']/X['households']
    X['populationPerHousehold'] = X['population']/X['households']
    X['bedroomsPerRoom'] = X['totalBedRooms']/X['totalRooms']

    #print(X.head())
    return(X)

In [592]:
#Create a function transformer function
roomsperhousehold_ft = FunctionTransformer(roomsperhousehold, validate=False)

In [593]:
#steps for pipeline
steps = [('roomsperhousehold_nm',roomsperhousehold_ft),
         ('scalar',StandardScaler()),
         ('selection',ItemSelector(cols =(0,3,4,5,8,9,10))),
         ('model',LinearRegression())
         ]

In [594]:
#Add all the models 
param = [{'model':[LinearRegression()],'model__normalize':[True,False]},
         {'model':[GradientBoostingRegressor()],'model__learning_rate':[0.01,0.001],
         'model__n_estimators':[100,200],'model__verbose':[1]}
        ]

In [595]:
#Create a pipeline object
pl = Pipeline(steps)

In [596]:
#Spark version of Gridsearch object
clf = GridSearchCV(sc,pl,param_grid=param,cv=5,n_jobs=-1)

#python version of Gridsearch object
#clf = GV(pl,param_grid=param,cv=5,n_jobs=-1) # 50 sec

In [597]:
clf.fit(train_X,train_y)

  y = column_or_1d(y, warn=True)


      Iter       Train Loss   Remaining Time 
         1           1.3175            2.64s
         2           1.3036            2.61s
         3           1.2900            2.60s
         4           1.2766            2.56s
         5           1.2636            2.58s
         6           1.2507            2.56s
         7           1.2381            2.54s
         8           1.2258            2.52s
         9           1.2138            2.49s
        10           1.2017            2.48s
        20           1.0929            2.32s
        30           1.0014            2.16s
        40           0.9251            2.01s
        50           0.8611            1.91s
        60           0.8075            1.79s
        70           0.7624            1.69s
        80           0.7243            1.58s
        90           0.6921            1.44s
       100           0.6650            1.30s
       200           0.5333            0.00s


GridSearchCV(cv=5, error_score='raise',
       estimator=Pipeline(steps=[('roomsperhousehold_nm', FunctionTransformer(accept_sparse=False,
          func=<function roomsperhousehold at 0x7fe6c3fb4048>,
          inv_kw_args=None, inverse_func=None, kw_args=None, pass_y=False,
          validate=False)), ('scalar', StandardScaler(copy=True, with_mean=True, with_std=True)), ('selection', ItemSelector(cols=(0, 3, 4, 5, 8, 9, 10))), ('model', LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize=False))]),
       fit_params={}, iid=True, n_jobs=-1,
       param_grid=[{'model': [LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize=False)], 'model__normalize': [True, False]}, {'model': [GradientBoostingRegressor(alpha=0.9, criterion='friedman_mse', init=None,
             learning_rate=0.1, loss='ls', max_depth=3, max_features=None,
  ...], 'model__learning_rate': [0.01, 0.001], 'model__n_estimators': [100, 200], 'model__verbose': [1]}],
       pre_dispatch='

In [598]:
#Predict on test dataset
predict_y = clf.predict(test_X)

In [599]:
#R2 score of test dataset
r2_score(test_y,predict_y)

0.60180947036351884

In [600]:
#Below mentioned is the best classifier and its model parameters
clf.best_params_

{'model': GradientBoostingRegressor(alpha=0.9, criterion='friedman_mse', init=None,
              learning_rate=0.01, loss='ls', max_depth=3, max_features=None,
              max_leaf_nodes=None, min_impurity_split=1e-07,
              min_samples_leaf=1, min_samples_split=2,
              min_weight_fraction_leaf=0.0, n_estimators=200,
              presort='auto', random_state=None, subsample=1.0, verbose=1,
              warm_start=False),
 'model__learning_rate': 0.01,
 'model__n_estimators': 200,
 'model__verbose': 1}

## Summary:

Below are the findings of this assignment.

1. There are multiple options to use spark as part of execution.
2. Using native spark and its MlLib(Option 1 in this assignment) is the fastest of other options. For this assignment the whole execution took around 15 sec. 
3. But in option 1, we have not performed Gridsearch or cross validation. And the result which we got is not the best result.
4. In option 2, we created an sklearn-pipeline object which is a very nice way to organize the code.
5. Gridsearch option provided by spark-sklearn is used to run multiple models parallely in spark without MlLib.
6. Gridsearch using spark-sklearn provided results in 45 sec. Gridsearch from sklearn provided results in 50 sec. 
7. But option 2, provided higher results than option 1. Improvement was aroudn 15%.