#### Step 1 (Optional): Install Homebrew
If you don’t have Homebrew, here’s the command:

- /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

#### Step 2: Install Java 8
Spark requires Java8, and this is where I had to browse Github to find this alternative command:

- brew --cask install homebrew/cask-versions/adoptopenjdk8

or

- brew tap adoptopenjdk/openjdk
- brew install --cask adoptopenjdk8

or 

- brew install --cask adoptopenjdk8

#### Step 3: Install Scala
You probably know it, but Apache-Spark is written in Scala, which is a requirement to run it.

- brew install scala

#### Step 4: Install Spark
We’re almost there. Let’s now install Spark:

- brew install apache-spark

#### Step 5: Install pySpark
You might want to write your Spark code in Python, and pySpark will be useful for that:

- pip install pyspark

#### Step 6: Modify your bashrc
Whether you have bashrc or zshrc, modify your profile with the following commands. Adapt the commands to match your Python path (using which python3) and the folder in which Java has been installed:

- export JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
- export JRE_HOME=/Library/java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre/
- export SPARK_HOME=/usr/local/Cellar/apache-spark/3.0.1/libexec
- export PATH=/usr/local/Cellar/apache-spark/3.0.1/bin:$PATH
- export PYSPARK_PYTHON=/Users/maelfabien/opt/anaconda3/bin/python

Finally, source the profile using:

- source .zshrc

And you are all set!

#### Step 7: Launch a Jupyter Notebook
Now, in your Jupyter notebook, you should be able to execute the following commands:

import pyspark

from pyspark import SparkContext

sc = SparkContext()

n = sc.parallelize([4,10,9,7])

n.take(3)



#### Option:

EDIT To install JDK 8 you need to go to https://www.oracle.com/java/technologies/javase-jdk8-downloads.html (login required)

After that I was able to start a Spark context with pyspark.

Checking if it works
In Python:

- from pyspark import SparkContext 

- sc = SparkContext.getOrCreate() 

check that it really works by running a job

example from http://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections

- data = range(10000) 

- distData = sc.parallelize(data)

- distData.filter(lambda x: not x&1).take(10)

Out: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [6]:
# import pyspark
# from pyspark import SparkContext
# sc = SparkContext()
# n = sc.parallelize([4,10,9,7])
# n.take(3)

In [1]:
from pyspark import SparkContext 
sc = SparkContext.getOrCreate() 

# check that it really works by running a job
# example from http://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections
data = range(10000) 
distData = sc.parallelize(data)
distData.filter(lambda x: not x&1).take(10)
# Out: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

22/02/03 12:18:07 WARN Utils: Your hostname, Calvins-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.69 instead (on interface en0)
22/02/03 12:18:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/03 12:18:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
import findspark
findspark.init()
#from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder


In [3]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [9]:
data = sc.read.csv('data.csv', inferSchema=True, header=True)

In [10]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Engine Fuel Type: string (nullable = true)
 |-- Engine HP: integer (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Transmission Type: string (nullable = true)
 |-- Driven_Wheels: string (nullable = true)
 |-- Number of Doors: integer (nullable = true)
 |-- Market Category: string (nullable = true)
 |-- Vehicle Size: string (nullable = true)
 |-- Vehicle Style: string (nullable = true)
 |-- highway MPG: integer (nullable = true)
 |-- city mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- MSRP: integer (nullable = true)



In [11]:
data.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Make,11914,,,Acura,Volvo
Model,11914,745.5822222222222,1490.8280590623795,1 Series,xD
Year,11914,2010.384337753903,7.5797398875957995,1990,2017
Engine Fuel Type,11911,,,diesel,regular unleaded
Engine HP,11845,249.38607007176023,109.19187025917194,55,1001
Engine Cylinders,11884,5.628828677213059,1.78055934824622,0,16
Transmission Type,11914,,,AUTOMATED_MANUAL,UNKNOWN
Driven_Wheels,11914,,,all wheel drive,rear wheel drive
Number of Doors,11908,3.4360933825999327,0.8813153865835529,2,4


In [12]:
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

In [13]:
data = data.withColumn("Market Category", replace(col("Market Category"), "N/A"))

In [19]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas() #.show()

Unnamed: 0,Make,Model,Year,Engine Fuel Type,Engine HP,Engine Cylinders,Transmission Type,Driven_Wheels,Number of Doors,Market Category,Vehicle Size,Vehicle Style,highway MPG,city mpg,Popularity,MSRP
0,0,0,0,3,69,30,0,0,6,3742,0,0,0,0,0,0


In [20]:
data = data.drop('Market Category')
data = data.na.drop()
print((data.count(), len(data.columns)))

(11812, 15)


In [22]:
assembler = VectorAssembler(inputCols = ['Year', 'Engine HP', 'Engine Cylinders', 'Number of Doors', 'highway MPG',
                                        'city mpg', 'Popularity'],
                           outputCol ='Attributes')

regressor = RandomForestRegressor(featuresCol ='Attributes', labelCol='MSRP')

pipeline = Pipeline(stages=[assembler, regressor])

pipeline.write().overwrite().save("pipeline")

In [24]:
!ls

ML Pipeline in PySpark MLlib.ipynb [34mpipeline[m[m
data.csv


In [61]:
pipelineModel = Pipeline.load("pipeline")

paramGrid = ParamGridBuilder() \
            .addGrid(regressor.numTrees, [100, 500, 1000]) \
            .addGrid(regressor.maxDepth, [5,10]) \
            .build()

crossval = CrossValidator(estimator=pipelineModel,
                         estimatorParamMaps = paramGrid,
                         evaluator = RegressionEvaluator(labelCol = 'MSRP'),
                         numFolds = 3)

In [62]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=111)

cvModel = crossval.fit(train_data)

22/02/03 13:59:45 WARN DAGScheduler: Broadcasting large task binary with size 1503.1 KiB
22/02/03 13:59:46 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/02/03 13:59:47 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
22/02/03 13:59:48 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
22/02/03 13:59:48 WARN DAGScheduler: Broadcasting large task binary with size 1156.5 KiB
22/02/03 13:59:51 WARN DAGScheduler: Broadcasting large task binary with size 1122.0 KiB
22/02/03 13:59:52 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/02/03 13:59:56 WARN DAGScheduler: Broadcasting large task binary with size 1122.0 KiB
22/02/03 13:59:56 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/02/03 13:59:58 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/02/03 13:59:59 WARN DAGScheduler: Broadcasting large task binary with size 1185.5 KiB
22/02/03 14:00:00 WARN DAGScheduler: Br

22/02/03 14:03:46 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/02/03 14:03:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/02/03 14:03:50 WARN DAGScheduler: Broadcasting large task binary with size 1317.4 KiB
22/02/03 14:03:53 WARN DAGScheduler: Broadcasting large task binary with size 1050.4 KiB
22/02/03 14:03:55 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/02/03 14:03:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/02/03 14:03:58 WARN DAGScheduler: Broadcasting large task binary with size 1317.4 KiB
22/02/03 14:03:59 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
22/02/03 14:04:01 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/03 14:04:03 WARN DAGScheduler: Broadcasting large task binary with size 14.0 MiB
22/02/03 14:04:06 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
22/02/03 14:04:09 WARN DAGScheduler: Broadca

In [63]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
    print(bestModel.stages[x])

VectorAssembler_57f2dcde93b1
RandomForestRegressionModel: uid=RandomForestRegressor_69e98f355331, numTrees=1000, numFeatures=7


In [70]:
pred =cvModel.transform(test_data)

pred.select('MSRP','prediction').show()

+-----+------------------+
| MSRP|        prediction|
+-----+------------------+
|29980|30453.328188637977|
|30030|30450.048585332726|
|30350| 30546.79556016742|
|27990|31747.924681006716|
|29290|31747.924681006716|
|32990|31747.924681006716|
| 2912|2913.0022169840636|
| 3381| 3414.952109400304|
|21050|19029.981156236576|
|21600| 18769.52568996837|
| 2000|2175.2881398105965|
| 2144| 2198.570711117312|
|48840| 42597.24318809616|
|45015| 42908.84432294007|
|49440| 42908.84432294007|
|43950| 42757.93691368864|
|89000| 48726.59899931966|
|40195|  41612.9396332748|
|40370| 41784.53005135344|
|39270| 41741.38854047442|
+-----+------------------+
only showing top 20 rows



In [71]:
eval = RegressionEvaluator(labelCol ='MSRP') #default is RMSE
rmse = eval.evaluate(pred)

mse = eval.evaluate(pred, {eval.metricName: "mse"})
mae = eval.evaluate(pred, {eval.metricName: "mae"})
r_square = eval.evaluate(pred, {eval.metricName: "r2"})

In [72]:
print("RMSE: %.3f" %rmse)
print("MSE: %.3f" %mse)
print("MAE: %.3f" %mae)
print("R2: %.3f" %r_square)

RMSE: 21922.188
MSE: 480582315.201
MAE: 6038.737
R2: 0.863


In [47]:
help(regressor)

Help on RandomForestRegressor in module pyspark.ml.regression object:

class RandomForestRegressor(_JavaRegressor, _RandomForestRegressorParams, pyspark.ml.util.JavaMLWritable, pyspark.ml.util.JavaMLReadable)
 |  RandomForestRegressor(*, featuresCol='features', labelCol='label', predictionCol='prediction', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='variance', subsamplingRate=1.0, seed=None, numTrees=20, featureSubsetStrategy='auto', leafCol='', minWeightFractionPerNode=0.0, weightCol=None, bootstrap=True)
 |  
 |  `Random Forest <http://en.wikipedia.org/wiki/Random_forest>`_
 |  learning algorithm for regression.
 |  It supports both continuous and categorical features.
 |  
 |  .. versionadded:: 1.4.0
 |  
 |  Examples
 |  --------
 |  >>> from numpy import allclose
 |  >>> from pyspark.ml.linalg import Vectors
 |  >>> df = spark.createDataFrame([
 |  ...     (1.0, Vectors.dense(1.0)),
 |  ...

In [82]:
test_df = pred.select('MSRP','prediction').toPandas()
test_df['pe'] = (test_df['prediction'] - test_df['MSRP'])/test_df['MSRP']
mape = ((test_df['prediction'] - test_df['MSRP'])/test_df['MSRP']).mean()

In [88]:
print('MAPE: %.3f' %mape)
print('Accuracy: %.3f' %(1-mape))

MAPE: 0.056
Accuracy: 0.944
