In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, StringIndexerModel
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Transformer

from pyspark.sql.types import StringType

import mlflow
from mlflow.tracking import MlflowClient

In [3]:
spark = SparkSession.builder.appName("PySparkTitanikJob")\
    .getOrCreate()

24/05/13 18:22:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/13 18:22:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark

In [5]:
mlflow.start_run()

<ActiveRun: >

In [19]:
from pyspark.sql.functions import split, col
df = spark.read.parquet('../part.0.parquet')
tag_index = StringIndexer(inputCol='image_tag', outputCol="image_tag_index", handleInvalid="skip")
user_index = StringIndexer(inputCol='user', outputCol="user_index", handleInvalid="skip")
image_index = StringIndexer(inputCol='image', outputCol="image_index", handleInvalid="skip")

df = tag_index.fit(df).transform(df)
df = user_index.fit(df).transform(df)
df = image_index.fit(df).transform(df)


In [20]:
df.show()

+---------------------+------------------+----------------+-----------+-----------------+-------------+---------+--------------------+---------------------------+------------------------+---------+------+---------------+----------+-----------+
|has_install_or_update|has_clean_commands|has_exposed_port|port_number|has_setuid_setgid|    image_tag|     user|               image|has_package_update_commands|dangerous_commands_count|safe_copy|result|image_tag_index|user_index|image_index|
+---------------------+------------------+----------------+-----------+-----------------+-------------+---------+--------------------+---------------------------+------------------------+---------+------+---------------+----------+-----------+
|                 true|             false|           false|          0|            false|       latest|     root|              alpine|                      false|                       2|    false|     1|            0.0|       0.0|        1.0|
|                false| 

24/05/13 19:39:31 WARN DAGScheduler: Broadcasting large task binary with size 1077.9 KiB


In [22]:
feature = VectorAssembler(
    inputCols=["has_install_or_update", 
                             "has_clean_commands", 
                             "has_exposed_port", 
                             "port_number", 
                             "has_setuid_setgid", 
                             "image_tag_index", 
                             "user_index",
                             "image_index",
                             "has_package_update_commands",
                             "dangerous_commands_count",
                             "safe_copy"],
    outputCol="features")
feature_vector= feature.transform(df)
feature_vector.show()

+---------------------+------------------+----------------+-----------+-----------------+-------------+---------+--------------------+---------------------------+------------------------+---------+------+---------------+----------+-----------+--------------------+
|has_install_or_update|has_clean_commands|has_exposed_port|port_number|has_setuid_setgid|    image_tag|     user|               image|has_package_update_commands|dangerous_commands_count|safe_copy|result|image_tag_index|user_index|image_index|            features|
+---------------------+------------------+----------------+-----------+-----------------+-------------+---------+--------------------+---------------------------+------------------------+---------+------+---------------+----------+-----------+--------------------+
|                 true|             false|           false|          0|            false|       latest|     root|              alpine|                      false|                       2|    false|     1| 

24/05/13 19:40:46 WARN DAGScheduler: Broadcasting large task binary with size 1091.6 KiB


In [23]:
(training_data, test_data) = feature_vector.randomSplit([0.8, 0.2],seed = 42)
training_data.show()

24/05/13 19:40:50 WARN DAGScheduler: Broadcasting large task binary with size 1113.8 KiB


+---------------------+------------------+----------------+-----------+-----------------+--------------------+---------+--------------------+---------------------------+------------------------+---------+------+---------------+----------+-----------+--------------------+
|has_install_or_update|has_clean_commands|has_exposed_port|port_number|has_setuid_setgid|           image_tag|     user|               image|has_package_update_commands|dangerous_commands_count|safe_copy|result|image_tag_index|user_index|image_index|            features|
+---------------------+------------------+----------------+-----------+-----------------+--------------------+---------+--------------------+---------------------------+------------------------+---------+------+---------------+----------+-----------+--------------------+
|                false|              true|           false|          0|            false|            "${tag}"|     root|   cognexa/archlinux|                      false|               

In [24]:
#LogisticRegression

evaluator = MulticlassClassificationEvaluator(
    labelCol="result", predictionCol="prediction", metricName="accuracy"
)
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="result", featuresCol="features")

lrModel = lr.fit(training_data)     

lr_prediction = lrModel.transform(test_data)
lr_prediction.select("prediction", "result", "features").show(5)

lr_accuracy = evaluator.evaluate(lr_prediction)
print("LogisticRegression [Accuracy] = %g"% (lr_accuracy))
print("LogisticRegression [Error] = %g " % (1.0 - lr_accuracy))


24/05/13 19:40:57 WARN DAGScheduler: Broadcasting large task binary with size 1117.3 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WARN DAGScheduler: Broadcasting large task binary with size 1118.9 KiB
24/05/13 19:40:58 WAR

+----------+------+--------------------+
|prediction|result|            features|
+----------+------+--------------------+
|       0.0|     0|(11,[1,5,7,9],[1....|
|       0.0|     0|(11,[1,5,6,7],[1....|
|       0.0|     0|(11,[1,5,6,7],[1....|
|       0.0|     0|(11,[1,5,7],[1.0,...|
|       0.0|     0|(11,[1,5,6],[1.0,...|
+----------+------+--------------------+
only showing top 5 rows



24/05/13 19:40:59 WARN DAGScheduler: Broadcasting large task binary with size 1139.4 KiB


LogisticRegression [Accuracy] = 0.885815
LogisticRegression [Error] = 0.114185 


In [27]:
#DecisionTreeClassifier

from pyspark.ml.classification import DecisionTreeClassifier
dt_classifier = DecisionTreeClassifier(labelCol="result", featuresCol="features", maxBins=16000)
dt_model = dt.fit(training_data)
dt_prediction = dt_model.transform(training_data)

dt_prediction.select("prediction", "result", "features").show(5)


# Create a MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="result", predictionCol="prediction")

# Calculate the accuracy of the model
accuracy = evaluator.evaluate(dt_prediction)

# Calculate the error of the model
error = 1.0 - accuracy

print("DecisionTreeClassifier [Accuracy] = %g" % (accuracy))
print("DecisionTreeClassifier [Error] = %g" % (error))

24/05/13 19:42:20 WARN DAGScheduler: Broadcasting large task binary with size 1112.1 KiB
24/05/13 19:42:20 WARN DAGScheduler: Broadcasting large task binary with size 1116.8 KiB
24/05/13 19:42:21 WARN DAGScheduler: Broadcasting large task binary with size 1116.8 KiB
24/05/13 19:42:21 WARN DAGScheduler: Broadcasting large task binary with size 1122.9 KiB
24/05/13 19:42:21 WARN DAGScheduler: Broadcasting large task binary with size 1137.0 KiB
24/05/13 19:42:23 WARN DAGScheduler: Broadcasting large task binary with size 1137.8 KiB
24/05/13 19:42:24 WARN DAGScheduler: Broadcasting large task binary with size 1138.4 KiB
24/05/13 19:42:27 WARN DAGScheduler: Broadcasting large task binary with size 1297.8 KiB
24/05/13 19:42:32 WARN DAGScheduler: Broadcasting large task binary with size 1490.6 KiB
24/05/13 19:42:41 WARN DAGScheduler: Broadcasting large task binary with size 1370.9 KiB


+----------+------+--------------------+
|prediction|result|            features|
+----------+------+--------------------+
|       0.0|     0|(11,[1,5,7],[1.0,...|
|       0.0|     0|(11,[1,5,7],[1.0,...|
|       0.0|     0|(11,[1,5,7],[1.0,...|
|       0.0|     0|(11,[1,5,7],[1.0,...|
|       0.0|     0|(11,[1,5,7],[1.0,...|
+----------+------+--------------------+
only showing top 5 rows



24/05/13 19:42:42 WARN DAGScheduler: Broadcasting large task binary with size 1382.7 KiB
[Stage 2105:>                                                       (0 + 1) / 1]

DecisionTreeClassifier [Accuracy] = 0.943448
DecisionTreeClassifier [Error] = 0.0565515


                                                                                

In [112]:
#RandomForestClassifier

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="result", featuresCol="features", maxBins=16000)
rf_model = rf.fit(training_data)
rf_prediction = rf_model.transform(test_data)
rf_prediction.select("prediction", "result", "features").show(5)


rf_accuracy = evaluator.evaluate(rf_prediction)
print("RandomForestClassifier [Accuracy] = %g"% (rf_accuracy))
print("RandomForestClassifier [Error] = %g" % (1.0 - rf_accuracy))


24/05/11 10:17:38 WARN DAGScheduler: Broadcasting large task binary with size 1099.9 KiB
24/05/11 10:17:38 WARN DAGScheduler: Broadcasting large task binary with size 1104.6 KiB
24/05/11 10:17:38 WARN DAGScheduler: Broadcasting large task binary with size 1104.7 KiB
24/05/11 10:17:39 WARN DAGScheduler: Broadcasting large task binary with size 1110.7 KiB
24/05/11 10:17:39 WARN DAGScheduler: Broadcasting large task binary with size 1126.7 KiB
24/05/11 10:17:45 WARN DAGScheduler: Broadcasting large task binary with size 1469.6 KiB
24/05/11 10:17:54 WARN DAGScheduler: Broadcasting large task binary with size 1819.9 KiB
24/05/11 10:18:10 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/11 10:18:46 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
24/05/11 10:19:46 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


+----------+------+--------------------+
|prediction|result|            features|
+----------+------+--------------------+
|       1.0|     1|(11,[1,5,7,9],[1....|
|       1.0|     1|(11,[1,5,6,7],[1....|
|       1.0|     1|(11,[1,5,6,7],[1....|
|       1.0|     1|(11,[1,5,7],[1.0,...|
|       1.0|     1|(11,[1,5,6],[1.0,...|
+----------+------+--------------------+
only showing top 5 rows



24/05/11 10:19:46 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


RandomForestClassifier [Accuracy] = 0.929402
RandomForestClassifier [Error] = 0.0705982


In [115]:
#Gradient-boosted tree classifier

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="result", featuresCol="features",maxIter=10,maxBins=16000)
gbt_model = gbt.fit(training_data)
gbt_prediction = gbt_model.transform(test_data)
gbt_prediction.select("prediction", "result", "features").show(5)

gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Gradient-boosted [Accuracy] = %g"% (gbt_accuracy))
print("Gradient-boosted [Error] = %g"% (1.0 - gbt_accuracy))


24/05/11 10:22:11 WARN DAGScheduler: Broadcasting large task binary with size 1105.8 KiB
24/05/11 10:22:11 WARN DAGScheduler: Broadcasting large task binary with size 1105.8 KiB
24/05/11 10:22:12 WARN DAGScheduler: Broadcasting large task binary with size 1111.9 KiB
24/05/11 10:22:12 WARN DAGScheduler: Broadcasting large task binary with size 1127.1 KiB
24/05/11 10:22:13 WARN DAGScheduler: Broadcasting large task binary with size 1207.4 KiB
24/05/11 10:22:14 WARN DAGScheduler: Broadcasting large task binary with size 1208.2 KiB
24/05/11 10:22:17 WARN DAGScheduler: Broadcasting large task binary with size 1239.8 KiB
24/05/11 10:22:22 WARN DAGScheduler: Broadcasting large task binary with size 1265.7 KiB
24/05/11 10:22:33 WARN DAGScheduler: Broadcasting large task binary with size 1365.6 KiB
24/05/11 10:22:34 WARN DAGScheduler: Broadcasting large task binary with size 1366.0 KiB
24/05/11 10:22:35 WARN DAGScheduler: Broadcasting large task binary with size 1387.8 KiB
24/05/11 10:22:38 WAR

+----------+------+--------------------+
|prediction|result|            features|
+----------+------+--------------------+
|       1.0|     1|(11,[1,5,7,9],[1....|
|       1.0|     1|(11,[1,5,6,7],[1....|
|       1.0|     1|(11,[1,5,6,7],[1....|
|       1.0|     1|(11,[1,5,7],[1.0,...|
|       1.0|     1|(11,[1,5,6],[1.0,...|
+----------+------+--------------------+
only showing top 5 rows



24/05/11 10:25:41 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB


Gradient-boosted [Accuracy] = 0.931711
Gradient-boosted [Error] = 0.0682893


In [117]:
#Save model

dt_model.write().overwrite().save('dt_model')


24/05/11 10:27:48 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


In [26]:
#Pipeline

from pyspark.ml.pipeline import PipelineModel

df_pipe = spark.read.parquet('../part.0.parquet')

train, test = df_pipe.randomSplit([0.8, 0.2])


tag_index = StringIndexer(inputCol='image_tag', outputCol="image_tag_index", handleInvalid="skip")
user_index = StringIndexer(inputCol='user', outputCol="user_index", handleInvalid="skip")
image_index = StringIndexer(inputCol='image', outputCol="image_index", handleInvalid="skip")

feature = VectorAssembler(
    inputCols=["has_install_or_update", 
                             "has_clean_commands", 
                             "has_exposed_port", 
                             "port_number", 
                             "has_setuid_setgid", 
                             "image_tag_index", 
                             "user_index",
                             "image_index",
                             "has_package_update_commands",
                             "dangerous_commands_count",
                             "safe_copy"],
    outputCol="features")


dt_classifier = DecisionTreeClassifier(labelCol="result", featuresCol="features", maxBins=16000)

pipeline = Pipeline(stages=[tag_index, user_index, image_index, feature, dt_classifier])

p_model = pipeline.fit(train)

p_model.write().overwrite().save('p_model')
model = PipelineModel.load('p_model')


evaluator = MulticlassClassificationEvaluator(
    labelCol="result", predictionCol="prediction", metricName="accuracy"
)

prediction = p_model.transform(test)
# test.show(10)
p_accuracy = evaluator.evaluate(prediction)
print("Pipeline model [Accuracy] = %g"% (p_accuracy))
print("Pipeline model [Error] = %g " % (1.0 - p_accuracy))


24/05/13 19:41:58 WARN DAGScheduler: Broadcasting large task binary with size 1034.3 KiB
24/05/13 19:41:58 WARN DAGScheduler: Broadcasting large task binary with size 1034.4 KiB
24/05/13 19:41:58 WARN DAGScheduler: Broadcasting large task binary with size 1040.4 KiB
24/05/13 19:41:59 WARN DAGScheduler: Broadcasting large task binary with size 1054.9 KiB
24/05/13 19:42:00 WARN DAGScheduler: Broadcasting large task binary with size 1055.7 KiB
24/05/13 19:42:01 WARN DAGScheduler: Broadcasting large task binary with size 1056.3 KiB
24/05/13 19:42:03 WARN DAGScheduler: Broadcasting large task binary with size 1216.3 KiB
24/05/13 19:42:06 WARN DAGScheduler: Broadcasting large task binary with size 1430.6 KiB
24/05/13 19:42:14 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/05/13 19:42:15 WARN DAGScheduler: Broadcasting large task binary with size 1344.6 KiB


Pipeline model [Accuracy] = 0.929526
Pipeline model [Error] = 0.0704736 


In [31]:
#Hyperparams

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

paramGrid = ParamGridBuilder() \
   .addGrid(dt_classifier.maxDepth, [2, 3, 4]) \
   .addGrid(dt_classifier.minInfoGain, [0.05, 0.1, 0.15]) \
   .build()
print(dt_classifier.maxDepth)
tvs = TrainValidationSplit(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=evaluator,
                            trainRatio=0.8)
model = tvs.fit(train)

best_model = model.bestModel
print("Best model hyperparameters:")
print("maxDepth:", best_model.stages[-1].getOrDefault("maxDepth"))
print("maxBins:", best_model.stages[-1].getOrDefault("maxBins"))
print("minInfoGain:", best_model.stages[-1].getOrDefault("minInfoGain"))

prediction = best_model.transform(test)
accuracy = evaluator.evaluate(prediction)
error = 1.0 - accuracy
print("Best model accuracy:", accuracy)
print("Best model error:", error)

24/05/13 20:11:47 WARN DAGScheduler: Broadcasting large task binary with size 1098.9 KiB
24/05/13 20:11:52 WARN DAGScheduler: Broadcasting large task binary with size 1029.3 KiB
24/05/13 20:11:59 WARN DAGScheduler: Broadcasting large task binary with size 1098.9 KiB
24/05/13 20:12:04 WARN DAGScheduler: Broadcasting large task binary with size 1029.3 KiB
[Stage 3686:>                                                       (0 + 1) / 1]

KeyboardInterrupt: 

24/05/13 20:12:10 WARN DAGScheduler: Broadcasting large task binary with size 1098.9 KiB
[Stage 3688:>                                                       (0 + 1) / 1]