In [1]:
from pyspark import SparkContext, SparkConf, SQLContext

# Set the additional propeties.
sparkConf = (SparkConf()
             .set(key="carbon.insert.batch.size", value="1000")
             .set(key="spark.driver.allowMultipleContexts",value="true")
             .set(key="spark.executor.extraJavaOptions", value="-Dwso2_custom_conf_dir=/home/supun/Downloads/wso2das-3.1.0/repository/conf"))

# Stop the default SparkContext created by pyspark. And create a new SparkContext using the above SparkConf.
sc.stop()
sparkCtx = SparkContext(conf=sparkConf)

# Check spark master.
print(sparkConf.get("spark.master"));

spark://172.17.0.1:7077


In [2]:
sqlCtx = SQLContext(sparkCtx)

sqlCtx.sql('CREATE TEMPORARY TABLE table1 ' +
           'USING org.wso2.carbon.analytics.spark.core.sources.AnalyticsRelationProvider ' +
           'OPTIONS (tenantId "-1234", tableName "IRIS_DATA_STREAM")')

df = sqlCtx.sql("SELECT * FROM table1");

In [15]:
df.show()

+------------+-----------+------------+-----------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|          class|
+------------+-----------+------------+-----------+---------------+
|         4.9|        3.0|         1.4|        0.2|    Iris-setosa|
|         4.3|        3.0|         1.1|        0.1|    Iris-setosa|
|         5.1|        3.5|         1.4|        0.3|    Iris-setosa|
|         5.1|        3.8|         1.5|        0.3|    Iris-setosa|
|         5.2|        3.4|         1.4|        0.2|    Iris-setosa|
|         4.4|        3.0|         1.3|        0.2|    Iris-setosa|
|         5.1|        3.4|         1.5|        0.2|    Iris-setosa|
|         5.5|        2.3|         4.0|        1.3|Iris-versicolor|
|         5.7|        2.8|         4.5|        1.3|Iris-versicolor|
|         5.6|        2.9|         3.6|        1.3|Iris-versicolor|
|         6.2|        2.2|         4.5|        1.5|Iris-versicolor|
|         5.9|        3.2|         4.8|        1

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.feature import LabeledPoint
from pyspark.sql.functions import col

assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
                            outputCol="features")

assembledDf = assembler.transform(df)

assembledDf.show()

transformedDf = assembledDf.select(col("class").alias("label"), col("features"))

transformedDf.show()
transformedDf.printSchema()

+------------+-----------+------------+-----------+---------------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|          class|         features|
+------------+-----------+------------+-----------+---------------+-----------------+
|         4.9|        3.0|         1.4|        0.2|    Iris-setosa|[4.9,3.0,1.4,0.2]|
|         4.3|        3.0|         1.1|        0.1|    Iris-setosa|[4.3,3.0,1.1,0.1]|
|         5.1|        3.5|         1.4|        0.3|    Iris-setosa|[5.1,3.5,1.4,0.3]|
|         5.1|        3.8|         1.5|        0.3|    Iris-setosa|[5.1,3.8,1.5,0.3]|
|         5.2|        3.4|         1.4|        0.2|    Iris-setosa|[5.2,3.4,1.4,0.2]|
|         4.4|        3.0|         1.3|        0.2|    Iris-setosa|[4.4,3.0,1.3,0.2]|
|         5.1|        3.4|         1.5|        0.2|    Iris-setosa|[5.1,3.4,1.5,0.2]|
|         5.5|        2.3|         4.0|        1.3|Iris-versicolor|[5.5,2.3,4.0,1.3]|
|         5.7|        2.8|         4.5|        1.3|Iri

In [8]:
# Training Model

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(transformedDf)

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(transformedDf)


# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = transformedDf.randomSplit([0.7, 0.3])

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [19]:
# Make predictions.
predictions = model.transform(testData)

# Select sample rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)

print("Accuracy =  " + str(round(accuracy*100, 2)) + "%")

rfModel = model.stages[2]
print(rfModel)

+----------+------------+-----------------+
|prediction|indexedLabel|         features|
+----------+------------+-----------------+
|       2.0|         2.0|[4.3,3.0,1.1,0.1]|
|       2.0|         2.0|[4.9,3.0,1.4,0.2]|
|       2.0|         2.0|[5.1,3.5,1.4,0.3]|
|       2.0|         2.0|[5.1,3.8,1.5,0.3]|
|       1.0|         0.0|[5.9,3.2,4.8,1.8]|
+----------+------------+-----------------+
only showing top 5 rows

Accuracy =  88.89%
RandomForestClassificationModel (uid=rfc_f2cf88040597) with 20 trees
