In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import lower, col
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [4]:
# Create hardcoded vector
denseVec = Vectors.dense(1.0, 2.0, 3.0)
print(denseVec)
size = 3
idx = [1, 2] # locations of non-zero elements in vector
values = [2.0, 3.0]
sparseVec = Vectors.sparse(size, idx, values)
print(sparseVec)

[1.0,2.0,3.0]
(3,[1,2],[2.0,3.0])


In [5]:
# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '2gb') \
    .config("spark.cores.max", "2") \
    .getOrCreate()

# using SQLContext to read parquet file
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# to read parquet file
df = sqlContext.read.csv('../assets/iris-dataset/Iris.csv', header=True)
print(df.count())
print(df.printSchema())
df.show(5)



150
root
 |-- Id: string (nullable = true)
 |-- SepalLengthCm: string (nullable = true)
 |-- SepalWidthCm: string (nullable = true)
 |-- PetalLengthCm: string (nullable = true)
 |-- PetalWidthCm: string (nullable = true)
 |-- Species: string (nullable = true)

None
+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [6]:
df = df.withColumn("SepalLengthCm", col("SepalLengthCm").cast("double"))
df = df.withColumn("SepalWidthCm", col("SepalWidthCm").cast("double"))
df = df.withColumn("PetalLengthCm", col("PetalLengthCm").cast("double"))
df = df.withColumn("PetalWidthCm", col("PetalWidthCm").cast("double"))
df = df.withColumn("Species", lower(col("Species")))

indexer = StringIndexer(inputCol="Species", outputCol="label")
df = indexer.fit(df).transform(df)

assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="features"
)
df = assembler.transform(df)
print(df.printSchema())
df.show(5)

root
 |-- Id: string (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)

None
+---+-------------+------------+-------------+------------+-----------+-----+-----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|label|         features|
+---+-------------+------------+-------------+------------+-----------+-----+-----------------+
|  1|          5.1|         3.5|          1.4|         0.2|iris-setosa|  0.0|[5.1,3.5,1.4,0.2]|
|  2|          4.9|         3.0|          1.4|         0.2|iris-setosa|  0.0|[4.9,3.0,1.4,0.2]|
|  3|          4.7|         3.2|          1.3|         0.2|iris-setosa|  0.0|[4.7,3.2,1.3,0.2]|
|  4|          4.6|         3.1|          1.5|         0.2|iris-setosa|  0.0|[4.6,3.

In [7]:
df_train, df_test = df.randomSplit([0.7, 0.3])

In [8]:
model = LogisticRegression(labelCol="label",featuresCol="features")
print(model.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [9]:
model_fitted = model.fit(df_train)

In [10]:
df_pred = model_fitted.transform(df_test).select("id", "label", "prediction")
df_pred.show()

+---+-----+----------+
| id|label|prediction|
+---+-----+----------+
|100|  1.0|       1.0|
|103|  2.0|       2.0|
|108|  2.0|       2.0|
|113|  2.0|       2.0|
|116|  2.0|       2.0|
|117|  2.0|       2.0|
|123|  2.0|       2.0|
|127|  2.0|       2.0|
|129|  2.0|       2.0|
|133|  2.0|       2.0|
|134|  2.0|       1.0|
|135|  2.0|       1.0|
|141|  2.0|       2.0|
|143|  2.0|       2.0|
|145|  2.0|       2.0|
|146|  2.0|       2.0|
| 15|  0.0|       0.0|
|150|  2.0|       2.0|
| 16|  0.0|       0.0|
| 19|  0.0|       0.0|
+---+-----+----------+
only showing top 20 rows



In [11]:
# Calculate accuracy
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(df_pred)
print(f"Accuracy: {accuracy}")

# Calculate recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(df_pred)
print(f"Recall: {recall}")

Accuracy: 0.94
Recall: 0.94


# Persist with spark format

In [12]:
model_fitted.write().overwrite().save("../assets/models/iris_logistic_regression")

In [13]:
# in another session...
from pyspark.ml.classification import LogisticRegressionModel
model_fitted = LogisticRegressionModel.load("../assets/models/iris_logistic_regression")
y_pred = model_fitted.transform(df_test).select("id", "label", "prediction")
y_pred.show()

+---+-----+----------+
| id|label|prediction|
+---+-----+----------+
|100|  1.0|       1.0|
|103|  2.0|       2.0|
|108|  2.0|       2.0|
|113|  2.0|       2.0|
|116|  2.0|       2.0|
|117|  2.0|       2.0|
|123|  2.0|       2.0|
|127|  2.0|       2.0|
|129|  2.0|       2.0|
|133|  2.0|       2.0|
|134|  2.0|       1.0|
|135|  2.0|       1.0|
|141|  2.0|       2.0|
|143|  2.0|       2.0|
|145|  2.0|       2.0|
|146|  2.0|       2.0|
| 15|  0.0|       0.0|
|150|  2.0|       2.0|
| 16|  0.0|       0.0|
| 19|  0.0|       0.0|
+---+-----+----------+
only showing top 20 rows



# Persist with interchangable format


TODO: save to PMML and read with sklearn

CURRENTLY DOES NOT WORK

In [None]:
pip install pyspark2pmml
!export SPARK_HOME=/opt/spark-3.0.0-preview2/
spark.conf.set("packages", "org.jpmml:jpmml-sparkml:1.6.0") 

In [23]:
from pyspark.ml import Pipeline
pipeline_spark = Pipeline(stages=[model_fitted])

from pyspark2pmml import PMMLBuilder
PMMLBuilder(spark, df, pipeline_spark).buildFile("../assets/models/iris_logistic_regression.pmml")

RuntimeError: JPMML-SparkML not found on classpath

In [24]:
# in another session...
from sklearn2pmml import PMMLPipeline
pipeline_sklearn = PMMLPipeline.from_spark_pipeline(PipelineModel.load("../assets/models/iris_logistic_regression.pmml"))

ModuleNotFoundError: No module named 'sklearn2pmml'