# Testing MLlib with Python 3 in Docker


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("PythonMLlib")\
    .getOrCreate()

## Importing Libraries
Notes on libraries:

* Use VectorAssembler to convert df columns to features column. Notes [here](http://stackoverflow.com/questions/37574833/create-labledpoints-from-spark-dataframe-how-to-pass-list-of-names-to-vectoras).
* Pipeline example with regression [here](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/32296485791277/4041455208424802/2883572398008418/latest.html?utm_campaign=2016%20Spark%20Summit%20West&utm_medium=social&utm_source=slide).
* We used cast and withColumn to convert the column types. Cannot directly assign dataframe column. Sample code [here](http://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark)

In [23]:
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import GeneralizedLinearRegression

## Data munging

In [2]:
df = spark.read.csv('home_data.csv', header=True)
df.printSchema()
df.show(5)

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sqft_above: string (nullable = true)
 |-- sqft_basement: string (nullable = true)
 |-- yr_built: string (nullable = true)
 |-- yr_renovated: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- sqft_living15: string (nullable = true)
 |-- sqft_lot15: string (nullable = true)

+----------+---------------+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+

In [11]:
df = df.withColumn("price", df["price"].cast(DoubleType()))\
                   .withColumn("sqft_living", df["sqft_living"].cast(DoubleType()))\
                   .withColumn("zipcode", df["zipcode"].cast(IntegerType()))
                
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: double (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sqft_above: string (nullable = true)
 |-- sqft_basement: string (nullable = true)
 |-- yr_built: string (nullable = true)
 |-- yr_renovated: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- sqft_living15: string (nullable = true)
 |-- sqft_lot15: string (nullable = true)



## Train & Evaluate Model
Notes:
* Model evaluation values like p-value from model summary object. Code sample [here](https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/ml-classification-regression.html#generalized-linear-regression)
* Prediction values come from an evaluator object. Code sample [here](https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/ml-classification-regression.html#decision-tree-regression)

In [13]:
(trainData, testData) = df.randomSplit(seed=123, weights=[0.7,0.3])
print("The total data is {}, the training is {} and the test is {}"\
      .format(df.count(), trainData.count(), testData.count()))

The total data is 21613, the training is 15089 and the test is 6524


In [22]:
vectorizer = VectorAssembler(inputCols=[trainData.columns[5]], outputCol="features")
glr = GeneralizedLinearRegression(labelCol="price", family="gaussian", link="identity", maxIter=10, regParam=0.3)
simplePipeline = Pipeline(stages=[vectorizer, glr])
model = simplePipeline.fit(trainData)


# Summarize the model over the training set and print out some metrics
#print("AIC: " + str(model.aic))
print("Coefficient Standard Errors: " + str(model.coefficientStandardErrors))
#print("T Values: " + str(model.tValues))
#print("P Values: " + str(model.pValues))
#print("Deviance Residuals: ")
#model.residuals

AttributeError: 'PipelineModel' object has no attribute 'coefficientStandardErrors'

In [25]:
testingData = vectorizer.transform(testData)
# Make predictions.
predictions = model.transform(testingData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

AnalysisException: "Reference 'features' is ambiguous, could be: features#1367, features#1391.;"

In [27]:
testingData.select("features").show(3)

+--------+
|features|
+--------+
|[2400.0]|
|[2060.0]|
|[1460.0]|
+--------+
only showing top 3 rows

