In [None]:
!pip install pyspark

In [None]:
import os
import pandas as pd

import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed

In [None]:
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()

In [None]:
spark

In [None]:
sc = spark.sparkContext
sc

In [None]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

In [None]:
HOUSING_DATA = '../in/cal_housputing.data'

In [None]:
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
    StructField("long", FloatType nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("lat", FloatType(),(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)

In [None]:
# Load housing data
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()

In [None]:
# Inspect first five rows
housing_df.take(5)

In [None]:
# Show first five rows
housing_df.show(5)

In [None]:
# show the dataframe columns
housing_df.columns

In [None]:
# show the schema of the dataframe
housing_df.printSchema()

In [None]:
# run a sample selection
housing_df.select('pop','totbdrms').show(10)

In [None]:
# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("medage").count().sort("medage", ascending=False)

In [None]:
result_df.show(10)

In [None]:
result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))


In [None]:
(housing_df.describe().select(
                    "summary",
                    F.round("medage", 4).alias("medage"),
                    F.round("totrooms", 4).alias("totrooms"),
                    F.round("totbdrms", 4).alias("totbdrms"),
                    F.round("pop", 4).alias("pop"),
                    F.round("houshlds", 4).alias("houshlds"),
                    F.round("medinc", 4).alias("medinc"),
                    F.round("medhv", 4).alias("medhv"))
                    .show())

In [None]:
# Adjust the values of `medianHouseValue`
housing_df = housing_df.withColumn("medhv", col("medhv")/100000)

In [None]:
# Show the first 2 lines of `df`
housing_df.show(2)

In [None]:
housing_df.columns

In [None]:
# Add the new columns to `df`
housing_df = (housing_df.withColumn("rmsperhh", F.round(col("totrooms")/col("houshlds"), 2))
                       .withColumn("popperhh", F.round(col("pop")/col("houshlds"), 2))
                       .withColumn("bdrmsperrm", F.round(col("totbdrms")/col("totrooms"), 2)))

In [None]:
# Inspect the result
housing_df.show(5)

In [None]:
# Re-order and select columns
housing_df = housing_df.select("medhv", 
                              "totbdrms", 
                              "pop", 
                              "houshlds", 
                              "medinc", 
                              "rmsperhh", 
                              "popperhh", 
                              "bdrmsperrm")

In [None]:
featureCols = ["totbdrms", "pop", "houshlds", "medinc", "rmsperhh", "popperhh", "bdrmsperrm"]

In [None]:
# put features into a feature vector column
assembler = VectorAssembler(inputCols=featureCols, outputCol="features") 

In [None]:
assembled_df = assembler.transform(housing_df)

In [None]:
assembled_df.show(10, truncate=False)

In [None]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

In [None]:
# Fit the DataFrame to the scaler
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)

In [None]:
# Inspect the result
scaled_df.select("features", "features_scaled").show(10, truncate=False)

In [None]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=rnd_seed)

In [None]:
train_data.columns

In [None]:
# Initialize `lr`
lr = (LinearRegression(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', 
                               maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False))

In [None]:
# Fit the data to the model
linearModel = lr.fit(train_data)

In [None]:
# Coefficients for the model
linearModel.coefficients

In [None]:
featureCols

In [None]:
# Intercept for the model
linearModel.intercept

In [None]:
coeff_df = pd.DataFrame({"Feature": ["Intercept"] + featureCols, "Co-efficients": np.insert(linearModel.coefficients.toArray(), 0, linearModel.intercept)})
coeff_df = coeff_df[["Feature", "Co-efficients"]]

In [None]:
coeff_df

In [None]:
# Generate predictions
predictions = linearModel.transform(test_data)

In [None]:
# Extract the predictions and the "known" correct labels
predandlabels = predictions.select("predmedhv", "medhv")

In [None]:
predandlabels.show()

In [None]:
# Get the RMSE
print("RMSE: {0}".format(linearModel.summary.rootMeanSquaredError))

In [None]:
print("MAE: {0}".format(linearModel.summary.meanAbsoluteError))

In [None]:
# Get the R2
print("R2: {0}".format(linearModel.summary.r2))

In [None]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse')
print("RMSE: {0}".format(evaluator.evaluate(predandlabels)))

In [None]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='mae')
print("MAE: {0}".format(evaluator.evaluate(predandlabels)))

In [None]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='r2')
print("R2: {0}".format(evaluator.evaluate(predandlabels)))

In [None]:
# mllib is old so the methods are available in rdd
metrics = RegressionMetrics(predandlabels.rdd)

In [None]:
print("RMSE: {0}".format(metrics.rootMeanSquaredError))

In [None]:
print("MAE: {0}".format(metrics.meanAbsoluteError))

In [None]:
print("R2: {0}".format(metrics.r2))

In [None]:
spark.stop()