### Uploading the dataset to remote cluster

In [1]:
# Push the dataset to user HDFS directory in the cluster
import os
import dsx_core_utils
dsx_core_utils.upload_hdfs_file(
    source_path=os.environ['DSX_PROJECT_DIR']+'/datasets/SMSSpamCollection.csv', 
    target_path="/user/user1/SMSSpamCollection.csv",
    webhdfsurl="https://zinc1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/webhdfs/v1")

upload success


### Connecting to remote spark through DSX-HI

In [2]:
%load_ext sparkmagic.magics
from dsx_core_utils import proxy_util,dsxhi_util
proxy_util.configure_proxy_livy()

success configuring sparkmagic livy.


In [3]:
dsxhi_util.list_livy_endpoints()

['https://becks1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://becks1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1', 'https://cdh513edge11.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://cdh514edge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://cdh515edge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://cdh515edge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1', 'https://centos74edge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://centos74edge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1', 'https://rated3.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1', 'https://yccdh5.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://yccdh5.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1', 'https://ycedge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy/v1', 'https://ycedge1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1', 'https://zinc1.fyre.

In [4]:
%manage_spark

MagicsControllerWidget(children=(Tab(children=(ManageSessionWidget(children=(HTML(value=u'<br/>'), HTML(value=…

Added endpoint https://zinc1.fyre.ibm.com:8443/gateway/mjoudsx336-master-1/livy2/v1
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
912,application_1533478912530_0774,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


### Reading the dataset from remote cluster

In [5]:
%%spark
smsData = sc.textFile("hdfs:///user/user1/SMSSpamCollection.csv")
smsData.cache()

hdfs:///user/user1/SMSSpamCollection.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

### Creating a data pipeline

In [6]:
%%spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF    
from pyspark.ml.feature import Tokenizer
from pyspark.ml.classification import LogisticRegression
    
tokenizer = Tokenizer(inputCol="message",outputCol="words")
hashingTF = HashingTF(inputCol = tokenizer.getOutputCol(),outputCol="tempfeatures")
idf = IDF(inputCol = hashingTF.getOutputCol(),outputCol="features")
lrClassifier = LogisticRegression()
        
pipeline = Pipeline(stages=[tokenizer,hashingTF,idf,lrClassifier])

### Cleaning the Data

In [7]:
%%spark
# creating a labeled vector
def TransformToVector(string):
    attList = string.split(",")
    smsType = 0.0 if attList[0] == "ham" else 1.0
    return [smsType,attList[1]]
        
smsTransformed = smsData.map(TransformToVector)
        
# creating a data frame from labeled vector
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
smsDF = sqlContext.createDataFrame(smsTransformed,["label","message"])
smsDF.cache()

DataFrame[label: double, message: string]

### Perform Machine learning

In [8]:
%%spark
# split data frame into training and testing
(trainingData,testData) = smsDF.randomSplit([0.9,0.1])
        
#Build a model with Pipeline
lrModel = pipeline.fit(trainingData)
        
#Compute Predictions
prediction = lrModel.transform(testData)
            
#Evaluate Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                                                      labelCol="label", \
                                                      metricName = "accuracy")
accuracy = evaluator.evaluate(prediction)
print "Model Accuracy: " + str(round(accuracy*100,2))
        
# Draw a confusion matrix
prediction.groupby("label","prediction").count().show()

Model Accuracy: 92.47
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   39|
|  0.0|       1.0|    1|
|  1.0|       0.0|    6|
|  0.0|       0.0|   47|
+-----+----------+-----+