# Install Spark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 62kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 40.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=3db5db79087693572d8fe7c954ccdfcc4119d4e124e5b620cc8ac0533e62c415
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4

In [3]:
!ls

sample_data  spark-3.0.0-bin-hadoop2.7	spark-3.0.0-bin-hadoop2.7.tgz


In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [6]:
try:
    from pyspark import SparkContext, SparkConf
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [7]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

## Exercise

In [8]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-07-22 16:47:55--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet [following]
--2020-07-22 16:47:55--  https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet [following]
--2020-07-22 16:47:55--  https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (91

In [9]:
df.show()

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 20| 51| 35|Accelerometer-201...|Brush_teeth|
| 18| 49| 34|Accelerometer-201...|Brush_teeth|
| 19| 48| 34|Accelerometer-201...|Brush_teeth|
| 16| 53| 34|Accelerometer-201...|Brush_teeth|
| 18| 52| 35|

In [10]:
df_two_class = spark.sql("select * from df where class in ('Use_telephone','Standup_chair')")

In [11]:
df_two_class.show()

+---+---+---+--------------------+-------------+
|  x|  y|  z|              source|        class|
+---+---+---+--------------------+-------------+
| 30| 40| 51|Accelerometer-201...|Standup_chair|
| 30| 41| 51|Accelerometer-201...|Standup_chair|
| 31| 41| 51|Accelerometer-201...|Standup_chair|
| 29| 42| 51|Accelerometer-201...|Standup_chair|
| 30| 43| 52|Accelerometer-201...|Standup_chair|
| 30| 40| 52|Accelerometer-201...|Standup_chair|
| 31| 41| 52|Accelerometer-201...|Standup_chair|
| 32| 39| 52|Accelerometer-201...|Standup_chair|
| 29| 38| 52|Accelerometer-201...|Standup_chair|
| 29| 38| 50|Accelerometer-201...|Standup_chair|
| 28| 40| 50|Accelerometer-201...|Standup_chair|
| 31| 38| 51|Accelerometer-201...|Standup_chair|
| 30| 39| 51|Accelerometer-201...|Standup_chair|
| 30| 39| 50|Accelerometer-201...|Standup_chair|
| 31| 39| 51|Accelerometer-201...|Standup_chair|
| 30| 38| 52|Accelerometer-201...|Standup_chair|
| 29| 39| 53|Accelerometer-201...|Standup_chair|
| 31| 38| 52|Acceler

In [12]:
splits = df_two_class.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [15]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p = 1.0)

In [16]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)

In [17]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, gbt])

In [18]:
model = pipeline.fit(df_train)

In [19]:
prediction = model.transform(df_train)

In [20]:
prediction.show()

+---+---+---+--------------------+-------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|        class|label|       features|       features_norm|       rawPrediction|         probability|prediction|
+---+---+---+--------------------+-------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  0| 30| 24|Accelerometer-201...|Standup_chair|  0.0|[0.0,30.0,24.0]|[0.0,0.5555555555...|[1.31612807369960...|[0.93290889941545...|       0.0|
|  0| 31| 17|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,17.0]|[0.0,0.6458333333...|[1.31612807369960...|[0.93290889941545...|       0.0|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,32.0]|[0.0,0.4920634920...|[1.31612807369960...|[0.93290889941545...|       0.0|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,32.0]|[0.0,0.4920634920...|[1.31612807369960...|[0.93290889941545.

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
    
binEval.evaluate(prediction) 

0.9089619878533773

In [22]:
prediction = model.transform(df_test)

In [23]:
binEval.evaluate(prediction) 

0.9098110907982937

In [24]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(normalizer.p, [1.0, 2.0, 10.0]) \
    .addGrid(gbt.maxBins, [2,4,8,16]) \
    .addGrid(gbt.maxDepth, [2,4,8,16]) \
    .build()

In [25]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=4)  

In [26]:
cvModel = crossval.fit(df_train)

In [27]:
prediction = cvModel.transform(df_test)

In [28]:
binEval.evaluate(prediction) 

0.9094454600853138

In [29]:
cvModel.bestModel.stages[-1].explainParams()

"cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)