In [None]:
actorsDF = spark.read.format("csv").\
            option("header","true").\
            option("inferSchema","true").\
                csv("join-actors.txt")
moviesDF = spark.read.format("csv").\
            option("header","true").\
            option("inferSchema","true").\
            csv("join-series.txt")

            
moviesDF.join(actorsDF,"movieid","rightouter").show()

In [None]:
moviesDF.createOrReplaceTempView("movies")
actorsDF.createOrReplaceTempView("actors")
newDF = spark.sql("select * from movies m,actors a where a.movieid=m.movieid order by a.id")
newDF.show(100000)

In [42]:
pricesDF = spark.read.format("csv").\
            option("header","true").\
            option("inferSchema","true").\
            csv("house-prices-data.csv")
from pyspark.ml.feature import StringIndexer,OneHotEncoder

strCols= ["MSZoning","Street","Alley","LotShape","LandContour",
          "Utilities","LotConfig","LandSlope","Neighborhood",
          "Condition1","Condition2","BldgType","HouseStyle","RoofStyle",
         "RoofMatl","Exterior1st","Exterior2nd","MasVnrType",
         "ExterQual","ExterCond","Foundation","BsmtQual","BsmtCond",
         "BsmtExposure","BsmtFinType1","BsmtFinType2","Heating",
         "HeatingQC","CentralAir","Electrical","KitchenQual","Functional",
         "FireplaceQu","GarageType","GarageFinish","GarageQual","GarageCond",
         "PavedDrive","PoolQC","Fence","MiscFeature",
         "SaleType","SaleCondition"]
for colName in strCols:
    indexer = StringIndexer(inputCol=colName, 
                            outputCol=colName+"Index")
    pricesDF = indexer.fit(pricesDF).transform(pricesDF)
    encoder = OneHotEncoder(inputCol=colName+"Index",
                            outputCol=colName+"Vec")
    pricesDF = encoder.transform(pricesDF)
    pricesDF=pricesDF.drop(colName)
    pricesDF=pricesDF.drop(colName+"Index")
pricesDF.printSchema() 


root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- MasVnrArea: string (nullable = true)
 |-- BsmtFinSF1: integer (nullable = true)
 |-- BsmtFinSF2: integer (nullable = true)
 |-- BsmtUnfSF: integer (nullable = true)
 |-- TotalBsmtSF: integer (nullable = true)
 |-- 1stFlrSF: integer (nullable = true)
 |-- 2ndFlrSF: integer (nullable = true)
 |-- LowQualFinSF: integer (nullable = true)
 |-- GrLivArea: integer (nullable = true)
 |-- BsmtFullBath: integer (nullable = true)
 |-- BsmtHalfBath: integer (nullable = true)
 |-- FullBath: integer (nullable = true)
 |-- HalfBath: integer (nullable = true)
 |-- BedroomAbvGr: integer (nullable = true)
 |-- KitchenAbvGr: integer (nullable = true)
 |-- TotRmsAbv

In [43]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def makeNAZero(sVal):
    if(sVal=="NA"):
        return 0
    return int(sVal)

udfMakeNAZero = udf(makeNAZero,IntegerType())
NACols= ["LotFrontage","MasVnrArea","GarageYrBlt"]
for colName in NACols:
    pricesDF = pricesDF.withColumn(colName+"Int",
                               udfMakeNAZero(colName))
    pricesDF=pricesDF.drop(colName)



In [44]:
pricesDF = pricesDF.withColumn("label",pricesDF["SalePrice"])
pricesDF = pricesDF.drop("SalePrice")

In [47]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler=VectorAssembler(inputCols=pricesDF.columns[0:-1],
                                 outputCol="features")
pricesDF = vectorAssembler.transform(pricesDF)
pricesDF.select("features","label").show()

+--------------------+------+
|            features| label|
+--------------------+------+
|(262,[0,1,2,3,4,5...|208500|
|(262,[0,1,2,3,4,5...|181500|
|(262,[0,1,2,3,4,5...|223500|
|(262,[0,1,2,3,4,5...|140000|
|(262,[0,1,2,3,4,5...|250000|
|(262,[0,1,2,3,4,5...|143000|
|(262,[0,1,2,3,4,5...|307000|
|(262,[0,1,2,3,4,5...|200000|
|(262,[0,1,2,3,4,5...|129900|
|(262,[0,1,2,3,4,5...|118000|
|(262,[0,1,2,3,4,5...|129500|
|(262,[0,1,2,3,4,5...|345000|
|(262,[0,1,2,3,4,5...|144000|
|(262,[0,1,2,3,4,5...|279500|
|(262,[0,1,2,3,4,5...|157000|
|(262,[0,1,2,3,4,5...|132000|
|(262,[0,1,2,3,4,5...|149000|
|(262,[0,1,2,3,4,5...| 90000|
|(262,[0,1,2,3,4,5...|159000|
|(262,[0,1,2,3,4,5...|139000|
+--------------------+------+
only showing top 20 rows



In [50]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(featuresCol="features")
(pricesTrain,pricesTest) = pricesDF.randomSplit([0.7,0.3])


Root Mean Squared Error (RMSE) on test data = 43962.7


In [54]:
dtModel = dt.fit(pricesTrain)
predictionsDT = dtModel.transform(pricesTest)
dtEval = RegressionEvaluator(labelCol="label", 
                             predictionCol="prediction", 
                             metricName="rmse")
rmseDT = dtEval.evaluate(predictionsDT)
print("Root Mean Squared Error (RMSE) on test data (Decision Tree)= %g" % rmseDT)

Root Mean Squared Error (RMSE) on test data (Decision Tree)= 43962.7


In [55]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features")
rfModel = rf.fit(pricesTrain)
predictionsRF = rfModel.transform(pricesTest)

rmseDT = dtEval.evaluate(predictionsRF)
print("Root Mean Squared Error (RMSE) on test data (Random Forest)= %g" % rmseDT)

Root Mean Squared Error (RMSE) on test data (Random Forest)= 30235.3


In [58]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features",maxIter=20)
lrModel = lr.fit(pricesTrain)
predictionsLR = lrModel.transform(pricesTest)
rmseDT = dtEval.evaluate(predictionsLR)
print("Root Mean Squared Error (RMSE) on test data (Linear Regression)= %g" % rmseDT)

Root Mean Squared Error (RMSE) on test data (Linear Regression)= 30021.5


In [61]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
cvwithLR = LinearRegression(maxIter=15)
paramGrid = ParamGridBuilder()\
    .addGrid(cvwithLR.regParam, [0.1, 0.01]) \
    .addGrid(cvwithLR.fitIntercept, [False, True])\
    .addGrid(cvwithLR.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()
tvscvwithLR = TrainValidationSplit(estimator=cvwithLR,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           trainRatio=0.7)

modelcvwithLR = tvscvwithLR.fit(pricesTrain)

predictionscvwithLR = modelcvwithLR.transform(pricesTest)
rmseDT = dtEval.evaluate(predictionscvwithLR)
print("RMSE(TrainValidationSplit)= %g" % rmseDT)

RMSE(TrainValidationSplit)= 28768.2


In [60]:
bestModel= modelcvwithLR.bestModel
print 'Best Param (regParam): ', bestModel._java_obj.getRegParam()
print 'Best Param (MaxIter): ', bestModel._java_obj.getMaxIter()
print 'Best Param (elasticNetParam): ', bestModel._java_obj.getElasticNetParam()
print 'Best Param (intercept): ', bestModel.intercept

Best Param (regParam):  0.01
Best Param (MaxIter):  10
Best Param (elasticNetParam):  1.0
Best Param (intercept):  321308.43194


In [None]:
from pyspark.ml.tuning import CrossValidator
cvwithLR = LinearRegression(maxIter=10)
crossvalWithLR = CrossValidator(estimator=cvwithLR,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)  
modelCV = crossvalWithLR.fit(pricesTrain)

predictionsCV = modelCV.transform(pricesTest)
rmseDT = dtEval.evaluate(predictionsCV)
print("RMSE(TrainValidationSplit)= %g" % rmseDT)