In [5]:
# Import packages
import pyspark
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

The first step is to create a spark session

In [7]:
spark = SparkSession.builder \
        .master("local")\
        .appName("IrisModel")\
        .getOrCreate()

We can then create a spark context from this session object

In [33]:
sc = spark.sparkContext

In order to use our sample Iris data set we must locate where spark is in the vm

In [10]:
!pip install findspark

Collecting findspark
  Downloading findspark-1.1.0-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.1.0
[33mYou are using pip version 9.0.1, however version 9.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [11]:
import findspark
findspark.init()
findspark.find()

'/usr/local/spark'

Load the Iris Data as our training set

In [19]:
training =spark.read.format("libsvm").load("/usr/local/spark/data/mllib/sample_multiclass_classification_data.txt")

In [20]:
training.first()

Row(label=1.0, features=SparseVector(4, {0: -0.2222, 1: 0.5, 2: -0.7627, 3: -0.8333}))

Next we want import a logistic regression classmodel to train a model given the Iris data

In [22]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

Coefficients: 
3 X 4 CSRMatrix
(0,3) 0.3176
(1,2) -0.7804
(1,3) -0.377
Intercept: [0.0516523165983,-0.123912249909,0.0722599333102]


We must conform the data we retrieve from our webapp can be used for predictions, so we import linear algebra classobjects and pyspark sql types to coerce our new data to the proper form

In [39]:
from pyspark.ml.linalg import SparseVector, VectorUDT, DenseVector
from pyspark.sql.types import StructType, StructField, FloatType, DoubleType
from pyspark.sql import Row

input_data = [2.0, 0.0, 3.0, 4.0]

d = {}
for index, value in enumerate(input_data):
    d[index] = value

input_vector = SparseVector(len(input_data), d)

predict_df = sc.parallelize([(45.0, input_vector)])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])

predict_df.toDF(schema).printSchema()

temp_rdd_dense = predict_df.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))

final = temp_rdd_dense.toDF()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



We can now use our data to make a new prediction for the species of our flower attribute input

In [40]:
predictions = lrModel.transform(final)

In [85]:
predictions.select("prediction").take(1)[0][0]

0.0

Lastly, we will save (and make sure we can load) the model we created for docker

In [56]:
lrModel.save('dockerMl')

In [51]:
!pwd

/home/jovyan


In [64]:
sameModel = LogisticRegressionModel.load('./dockerMl')


In [67]:
sameModel.transform(final).select('*').collect()

[Row(features=DenseVector([2.0, 0.0, 3.0, 4.0]), label=45.0, rawPrediction=DenseVector([1.3222, -3.9729, 0.0723]), probability=DenseVector([0.7743, 0.0039, 0.2218]), prediction=0.0)]