In [None]:
import os
import sys
import pandas
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [None]:
#Set up spark context and SparkSession
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark regression example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
#Loading the Dataset
df = spark.read.format('com.databricks.spark.csv').\
                       options(header='true', \
                       inferschema='true').\
            load("file:///home/mofasafa6802/cloudxlab_jupyter_notebooks/USA_Housing.csv",header=True);

In [None]:
df.show(10,True)

+-----------+--------------+----------------+-------------------+---------------+-----------+
|Area Income|Area House Age|Area No of Rooms|Area No of Bedrooms|Area Population|      Price|
+-----------+--------------+----------------+-------------------+---------------+-----------+
|79545.45857|   5.682861322|     7.009188143|               4.09|     23086.8005|1059033.558|
|79248.64245|   6.002899808|     6.730821019|               3.09|    40173.07217|1505890.915|
|61287.06718|    5.86588984|      8.51272743|               5.13|     36882.1594|1058987.988|
|63345.24005|   7.188236095|     5.586728665|               3.26|    34310.24283|1260616.807|
|59982.19723|   5.040554523|     7.839387785|               4.23|    26354.10947|630943.4893|
|80175.75416|   4.988407758|     6.104512439|               4.04|    26748.42842|1068138.074|
|64698.46343|   6.025335907|     8.147759585|               3.41|    60828.24909|1502055.817|
|78394.33928|   6.989779748|     6.620477995|               

In [None]:
#Checking for Null Values

for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column Area Income with null values: 0
no. of cells in column Area House Age with null values: 0
no. of cells in column Area No of Rooms with null values: 0
no. of cells in column Area No of Bedrooms with null values: 0
no. of cells in column Area Population with null values: 0
no. of cells in column Price with null values: 0


In [None]:
#Data exploration
df.columns
df.printSchema()

root
 |-- Area Income: double (nullable = true)
 |-- Area House Age: double (nullable = true)
 |-- Area No of Rooms: double (nullable = true)
 |-- Area No of Bedrooms: double (nullable = true)
 |-- Area Population: double (nullable = true)
 |-- Price: double (nullable = true)



In [None]:
#Perform descriptive analytics
df.describe().show()

+-------+------------------+------------------+------------------+-------------------+-----------------+------------------+
|summary|       Area Income|    Area House Age|  Area No of Rooms|Area No of Bedrooms|  Area Population|             Price|
+-------+------------------+------------------+------------------+-------------------+-----------------+------------------+
|  count|              5000|              5000|              5000|               5000|             5000|              5000|
|   mean| 68583.10898397019| 5.977222035287008| 6.987791850909204| 3.9813299999999967|36163.51603854035|1232072.6541452995|
| stddev|10657.991213888685|0.9914561798324225|1.0058332312754115| 1.2341372654846832|9925.650113546026| 353117.6265836953|
|    min|       17796.63119|       2.644304186|       3.236194023|                2.0|      172.6106863|       15938.65792|
|    max|       107701.7484|       9.519088066|       10.75958834|                6.5|      69621.71338|       2469065.594|
+-------

In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Area Income,5000,68583.10898397019,10657.991213888685,17796.63119,107701.7484
Area House Age,5000,5.977222035287008,0.9914561798324225,2.644304186,9.519088066
Area No of Rooms,5000,6.987791850909204,1.0058332312754115,3.236194023,10.75958834
Area No of Bedrooms,5000,3.9813299999999967,1.2341372654846832,2.0,6.5
Area Population,5000,36163.51603854035,9925.650113546026,172.6106863,69621.71338
Price,5000,1232072.6541452995,353117.6265836953,15938.65792,2469065.594


In [None]:
# Find Co-relation X Variables with Y
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to Price for ", i, df.stat.corr('Price',i))

Correlation to Price for  Area Income 0.6397337782571293
Correlation to Price for  Area House Age 0.452542537178579
Correlation to Price for  Area No of Rooms 0.3356644533593983
Correlation to Price for  Area No of Bedrooms 0.1710710276560539
Correlation to Price for  Area Population 0.40855587932093074
Correlation to Price for  Price 1.0


In [None]:
#Convert the data to dense vector (features and label)
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [None]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    if indexCol:
        return data.select(indexCol,'features','label')
    else:
        return data.select('features','label')

In [None]:
#Transform the dataset to DataFrame
transformed= transData(df)
transformed.show(5)

+--------------------+-----------+
|            features|      label|
+--------------------+-----------+
|[79545.45857,5.68...|1059033.558|
|[79248.64245,6.00...|1505890.915|
|[61287.06718,5.86...|1058987.988|
|[63345.24005,7.18...|1260616.807|
|[59982.19723,5.04...|630943.4893|
+--------------------+-----------+
only showing top 5 rows



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=4).fit(transformed)

data = featureIndexer.transform(transformed)

In [None]:
data.show(5,True)

+--------------------+-----------+--------------------+
|            features|      label|     indexedFeatures|
+--------------------+-----------+--------------------+
|[79545.45857,5.68...|1059033.558|[79545.45857,5.68...|
|[79248.64245,6.00...|1505890.915|[79248.64245,6.00...|
|[61287.06718,5.86...|1058987.988|[61287.06718,5.86...|
|[63345.24005,7.18...|1260616.807|[63345.24005,7.18...|
|[59982.19723,5.04...|630943.4893|[59982.19723,5.04...|
+--------------------+-----------+--------------------+
only showing top 5 rows



In [None]:
# Split the data into training and test sets
(trainingData, testData) = transformed.randomSplit([0.8, 0.2])

In [None]:
trainingData.show(5)
testData.show(5)

+--------------------+-----------+
|            features|      label|
+--------------------+-----------+
|[17796.63119,4.94...| 302355.836|
|[35454.71466,6.85...|1077805.578|
|[35608.98624,6.93...|449331.5835|
|[35797.32312,5.54...|299863.0401|
|[35963.33081,3.43...|143027.3645|
+--------------------+-----------+
only showing top 5 rows

+--------------------+-----------+
|            features|      label|
+--------------------+-----------+
|[39033.80924,7.67...|1042814.098|
|[39294.03652,5.92...|781137.4618|
|[39653.77003,5.20...|395901.2501|
|[40185.73389,5.94...|529282.0844|
|[40503.54133,6.88...|798639.6542|
+--------------------+-----------+
only showing top 5 rows



In [None]:
# Import LinearRegression class
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()

In [None]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, lr])
model = pipeline.fit(trainingData)

In [None]:
def modelsummary(model):
    import numpy as np
    print ("Note: the last rows are the information for Intercept")
    print ("##","---------------------------------------------------")
    print ("##  ","  Estimate   |   Std.Error | t Values  |  P-value")
    print ("##","---------------------------------------------------")
    coef = np.append(list(model.coefficients),model.intercept)
    Summary=model.summary

    for i in range(len(Summary.pValues)):
        print ("##",'{:15.6f}'.format(coef[i]),'{:12.6f}'.format(Summary.coefficientStandardErrors[i]),\
        '{:11.3f}'.format(Summary.tValues[i]), \
        '{:12.6f}'.format(Summary.pValues[i]))

    print ("##","---------------------------------------------------")
    print ("##","Mean squared error: %.6f" % Summary.meanSquaredError)
    print ("##","RMSE              : %.6f" % Summary.rootMeanSquaredError )
    print ("##","R-squared         : %f" % Summary.r2)
    print ("##","Total iterations  : %i"% Summary.totalIterations)

In [None]:
modelsummary(model.stages[-1])

Note: the last rows are the information for Intercept
## ---------------------------------------------------
##     Estimate   |   Std.Error | t Values  |  P-value
## ---------------------------------------------------
##       21.508940     0.150804     142.628     0.000000
##   166230.977791  1618.070536     102.734     0.000000
##   120863.348440  1802.302245      67.061     0.000000
##     1393.558373  1470.612058       0.948     0.343388
##       15.057466     0.160901      93.582     0.000000
## -2631622.177511 19215.351741    -136.954     0.000000
## ---------------------------------------------------
## Mean squared error: 10189018076.260710
## RMSE              : 100940.666118
## R-squared         : 0.917533
## Total iterations  : 1


In [None]:
# Make predictions.
predictions = model.transform(testData)
predictions.select("features","label", "prediction").show(5)

+--------------------+-----------+-----------------+
|            features|      label|       prediction|
+--------------------+-----------+-----------------+
|[39033.80924,7.67...|1042814.098|954379.1942549641|
|[39294.03652,5.92...|781137.4618| 575802.915670652|
|[39653.77003,5.20...|395901.2501|536412.7485675332|
|[40185.73389,5.94...|529282.0844|491420.6038126438|
|[40503.54133,6.88...|798639.6542|873850.9818022437|
+--------------------+-----------+-----------------+
only showing top 5 rows



In [None]:
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()

import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))

r2_score: 0.9196741555422578


In [None]:
lr = LinearRegression(maxIter=5, solver="l-bfgs") # solver="l-bfgs" here

modelEvaluator=RegressionEvaluator()

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1,0.01]).addGrid(lr.elasticNetParam, [0, 1]).build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=2)

cvModel = crossval.fit(trainingData)

prediction = cvModel.transform(testData)

evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")

rms = evaluator.evaluate(prediction)


In [None]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print("Root Mean Squared Error (RMSE) on test data = %g" % rms)

Root Mean Squared Error (RMSE) on test data = 101768
