# Big data Project - Spark Analytics

In [None]:
California Housing dataset - Source : http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html

In [None]:
Objective - Build a model to predict the housing price

In [1]:
import  findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [2]:
#Setting up the spark context

In [3]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("project")
sc = SparkContext(conf = conf)

In [4]:
#Try out few RDD operations

In [5]:
rdd1 = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])
rdd3 = sc.parallelize(range(100))
rdd1.reduce(lambda a,b: a+b)

('a', 7, 'a', 2, 'b', 2)

In [6]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .getOrCreate()
   
sc = spark.sparkContext

In [7]:
# Load in the data
rdd = sc.textFile('C:/Users/vinut/Downloads/cal_housing/CaliforniaHousing/cal_housing.data')

# Load in the header
header = sc.textFile('C:/Users/vinut/Downloads/cal_housing/CaliforniaHousing/cal_housing.domain')


In [None]:
header.collect()


In [None]:
rdd.take(2)

In [None]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

In [None]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [None]:
df.show()

In [None]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

In [None]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [None]:
df.printSchema()

In [None]:
df.select('population','totalBedRooms').show(10)

In [None]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

In [None]:
df.describe().show()

In [None]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

In [None]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

In [None]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population",
              "housingMedianAge",
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
            "populationPerHousehold", 
              "bedroomsPerRoom")

In [None]:
df.take(2)


In [None]:
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *

In [None]:
df.printSchema()

In [None]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
dfnew = spark.createDataFrame(input_data, ["label", "features"])

In [None]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(dfnew)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(dfnew)

# Inspect the result
scaled_df.take(2)

In [None]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

Exploratory Data Analysis

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
import pandas as pd

In [None]:
from pyspark.mllib.stat import Statistics

In [None]:
corr_mat=Statistics.corr(features, method="pearson")

In [None]:
pd.set_option('display.max_columns', 50)

In [None]:
col_names = [ 
              "totalBedRooms", 
              "population",
              "housingMedianAge",
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
            "populationPerHousehold", 
              "bedroomsPerRoom"]
corr_df = pd.DataFrame(
                    corr_mat, 
                    index=col_names, 
                    columns=col_names)

corr_df

In [None]:
import seaborn as sns
%matplotlib inline
sns.heatmap(corr_df, 
        xticklabels=corr_df.columns,
        yticklabels=corr_df.columns)

In [None]:
# Doing the heavy lifting in Spark. We could leverage the `histogram` function from the RDD api

medianHouseValue_histogram = df.select('medianHouseValue').rdd.flatMap(lambda x: x).histogram(11)

# Loading the Computed Histogram into a Pandas Dataframe for plotting

medianHouseValue_histogram

hist_table=pd.DataFrame(list(medianHouseValue_histogram))

# Model Building - Linear Regression

In [None]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [None]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

In [None]:
# Coefficients for the model
linearModel.coefficients

In [None]:
# Intercept for the model
linearModel.intercept

# Model Evaluation - Linear Regression

In [None]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

In [None]:
# Get the R2
linearModel.summary.r2

# RANDOM FORREST

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Re-order and select columns
df4 = df.select("medianHouseValue",
               "totalBedRooms", 
              "population",
              "housingMedianAge",
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
            "populationPerHousehold", 
              "bedroomsPerRoom",)

In [None]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df3.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df3 = spark.createDataFrame(input_data, ["label", "features"])

In [None]:
rfModel = model.stages[1]
print(rfModel)

# RANDOM FOREST REGRESSION

 Convert the dataframe into Pandas

In [None]:
df4.toPandas().head()

Create a feature rdd with all the dependent variables

In [None]:
features=df4.rdd.map(lambda row: row[1:])

In [None]:
features.take(1)

In [None]:
type(features)

In [None]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

Since different columns in the dataset are on a different scale, the effect of each column might not be appropriate. 
So standardize and scale them

In [None]:
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)

In [None]:
features_transform.take(2)

Create the Label RDD, with the dependent variable

In [None]:
labels=df4.rdd.map(lambda row: row[0])
labels.take(2)

Create a dense vector that has both the label and the features

In [None]:
transformedData=labels.zip(features_transform)
transformedData.take(2)

Create Labeled Rdd that labels the dependent variable column.

In [None]:
transformedData = transformedData.map(lambda row : LabeledPoint(row[0],row[1]))
transformedData.take(5)
type(transformedData)

Create Training Testing split data 

In [None]:
trainingData, testingData = transformedData.randomSplit([.8,.2])

In [None]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

# Build the Model - Random Forest

In [None]:
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                        numTrees=3, featureSubsetStrategy="auto",
                                        impurity='variance', maxDepth=4, maxBins=32)

In [None]:
predictions = model.predict(testingData.map(lambda x: x.features))

In [None]:
labelsAndPredictions = testingData.map(lambda lp: lp.label).zip(predictions)

In [None]:
labelsAndPredictions.take(2)

# Model Evaluation

In [None]:
trainingMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(trainingData.count())

In [None]:
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testingData.count())

In [None]:
print('Train Mean Squared Error = ' + str(trainingMSE))
print('Test Mean Squared Error = ' + str(testMSE))

# Build thhe Model - Gradient Boosting Trees

In [None]:
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils


# Gradient Boosting Trees- With 3 iterations

In [None]:
model = GradientBoostedTrees.trainRegressor(trainingData,
                                            categoricalFeaturesInfo={}, numIterations=3)

In [None]:
predictions = model.predict(testingData.map(lambda x: x.features))
labelsAndPredictions = testingData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testingData.count())

# Model Evaluation 

In [None]:
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression GBT model:')
print(model.toDebugString())

# Gradient Boosting Trees- 100 iterations

In [None]:
model = GradientBoostedTrees.trainRegressor(trainingData,
                                            categoricalFeaturesInfo={}, numIterations=100)

In [None]:
predictions = model.predict(testingData.map(lambda x: x.features))
labelsAndPredictions = testingData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testingData.count())

In [None]:
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression GBT model:')

As the Number of Iterasion increase, the model works better and the error in the model decreases.

In [None]:
spark.stop()