In [8]:
sc.stop()

In [9]:
from pyspark.sql import SparkSession

import socket
import pyspark

SPARK_NAMESPACE="default"
SA="spark-driver"
K8S_CACERT="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
K8S_TOKEN="/var/run/secrets/kubernetes.io/serviceaccount/token"
DOCKER_IMAGE="hishailesh77/spark2.4.6-deb:latest"
SPARK_DRIVER_HOST=socket.getfqdn()
SPARK_DRIVER_PORT=20020


spark = SparkSession.builder \
    .appName("Jupyter Notebook") \
    .master("k8s://https://kubernetes.default:443") \
    .config("spark.kubernetes.authenticate.driver.serviceAccountName",SA) \
    .config("spark.kubernetes.namespace",SPARK_NAMESPACE) \
    .config("spark.kubernetes.authenticate.subdmission.caCertFile",K8S_CACERT) \
    .config("spark.kubernetes.authenticate.submission.oauthTokenFile",K8S_TOKEN) \
    .config("spark.kubernetes.container.image", DOCKER_IMAGE) \
    .config("spark.kubernetes.container.image.pullPolicy","Always") \
    .config("spark.driver.port",SPARK_DRIVER_PORT) \
    .config("spark.driver.host",SPARK_DRIVER_HOST) \
    .config("spark.executor.instances", "12") \
    .config("spark.driver.memory","16g") \
    .config("spark.executor.memory","16g") \
    .config("spark.driver.cores","8") \
    .config("spark.executor.cores","4") \
    .config("spark.memory.offHeap.enabled","true") \
    .config("spark.memory.offHeap.size","8g") \
    .config("spark.hadoop.fs.s3a.access.key", "AKIAX3D75DYHLQPYD4IT") \
    .config("spark.hadoop.fs.s3a.secret.key", "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") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false") \
    .config("spark.hadoop.fs.s3a.fast.upload","true") \
    .getOrCreate()

sc= spark.sparkContext.getOrCreate()


In [10]:
df=spark.read.format("parquet").option("inferSchema", "true").option("header", "false").load("s3a://ml-workflow-data/security/Malware_Dataset/Output/malware_tmp_parquet_2")

In [11]:
df.show()

+--------------------+--------------------+-----+--------------------+--------------------+
|           file_name|                File|label|              countv|            features|
+--------------------+--------------------+-----+--------------------+--------------------+
|13YpdP5vTLOazSQFRgJn|s3a://ml-workflow...|    3|(20,[0,1,3,4,5,6,...|[1094.0,564.0,0.0...|
|12tjh4qCkcHpObVBEeMr|s3a://ml-workflow...|    3|(20,[0,1,2,3,4,5,...|[2843.0,2300.0,86...|
|15loeAHtkJa8BuFi6Zry|s3a://ml-workflow...|    1|(20,[0,1,2,3,4,5,...|[126025.0,107847....|
|4JGbOVQnEt3ZP5acW7Yz|s3a://ml-workflow...|    6|(20,[0,1,2,3,4,5,...|[146066.0,130765....|
|5YMeDkHjclrCPd8OuymR|s3a://ml-workflow...|    4|(20,[0,1,2,3,4,5,...|[76734.0,68712.0,...|
|12Jd4qpOzTtQC3E6PXDb|s3a://ml-workflow...|    3|(20,[0,1,3,4,5,6,...|[1095.0,561.0,0.0...|
|7KqfVlEBOmr6tTQewNG8|s3a://ml-workflow...|    7|(20,[0,1,2,3,4,5,...|[2387.0,1167.0,48...|
|5PBvwNE8sCzm1bjUlf6A|s3a://ml-workflow...|    7|(20,[0,1,2,3,4,5,...|[2384.0,11

In [103]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(maxIter=100)

(trainingData, testData) = df.randomSplit([0.7, 0.3])

pipeline = Pipeline(stages=[lr])

model = pipeline.fit(trainingData)

predictions = model.transform(testData)

#predictions.select("prediction", "label", "features").show(5)

#Fit and Transform the data and calculate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
error=(1.0 -accuracy)


print(f"Accuracy = {accuracy} , Test Error =  {error}")

#Save the Model

model.write().overwrite().save("s3a://ml-workflow-data/security/Malware_Dataset/Output/Model_LR_MaxITR_100")

Accuracy = 0.7137254901960784 , Test Error =  0.28627450980392155


In [104]:
#Load the Model from S3

savedModel = PipelineModel.load("s3a://ml-workflow-data/security/Malware_Dataset/Output/Model_LR_MaxITR_100")
new_df=spark.read.format("parquet").option("inferSchema", "true").option("header", "false").load("s3a://ml-workflow-data/security/Malware_Dataset/Output/malware_tmp_parquet_2")

#Predict the new values
predictions = savedModel.transform(new_df)

predictions.select("prediction", "label", "features").show(20)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       7.0|    3|[1094.0,564.0,0.0...|
|       3.0|    3|[2843.0,2300.0,86...|
|       8.0|    1|[126025.0,107847....|
|       6.0|    6|[146066.0,130765....|
|       4.0|    4|[76734.0,68712.0,...|
|       7.0|    3|[1095.0,561.0,0.0...|
|       7.0|    7|[2387.0,1167.0,48...|
|       7.0|    7|[2384.0,1169.0,49...|
|       7.0|    7|[2386.0,1174.0,52...|
|       4.0|    4|[94992.0,81582.0,...|
|       7.0|    3|[1066.0,428.0,0.0...|
|       4.0|    4|[97684.0,84637.0,...|
|       4.0|    4|[39494.0,34922.0,...|
|       4.0|    4|[76719.0,68146.0,...|
|       4.0|    4|[104409.0,86829.0...|
|       7.0|    7|[2370.0,1185.0,51...|
|       4.0|    4|[87480.0,75488.0,...|
|       8.0|    8|[16926.0,4007.0,2...|
|       4.0|    4|[76241.0,68343.0,...|
|       7.0|    7|[2383.0,1157.0,50...|
+----------+-----+--------------------+
only showing top 20 rows

