In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import from_json
from pyspark.sql.functions import from_csv
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

In [2]:
# Spark session & context
def init():
    spark = (SparkSession
         .builder
         .master('local')
         .appName('AndMalwareMLTest')
         # Add kafka package  
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2")\
         .getOrCreate())
    return spark

In [3]:
sc = init()
# Create stream dataframe setting kafka server, topic and offset option
def getReadStream(spark):
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka-server:9092") \
      .option("startingOffsets", "earliest") \
      .option("subscribe", "MalModel") \
      .load()
    
    #df.selectExpr("CAST(value AS STRING)", "timestamp")
    dff = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))
    return dff


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a154027b-c241-4fe8-b261-fb0008609a5a;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2!spark-sql-kafka-0-10_2.12.jar (105ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.2/spark-token-provider-kafka-0-10_2.12-3.1.2.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2!spark-token-provider-kafka-0-10

In [20]:
from pyspark.ml.classification import DecisionTreeClassificationModel

Model = DecisionTreeClassificationModel.load("hdfs://namenode:9000/user/jovyan/model")
print(Model)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_af2427526490, depth=5, numNodes=43, numClasses=2, numFeatures=5


In [11]:
def foreach_batch_function(df, epoch_id):

    labelIndexer = StringIndexer(inputCol="Label", outputCol="indexedLabel").fit(df)
    print(labelIndexer)
    col = df.columns
    del col[-1]
    vecta = VectorAssembler(inputCols=col,outputCol="features")
    print(vecta)
    dataTemp = vecta.transform(df)
    #dataTemp = dataTemp.dropna()
    featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dataTemp)
    dataTemp2 = featureIndexer.transform(dataTemp)
    #print(featureIndexer)
    predictions = Model.transform(dataTemp2)
    predictions.show()
    pass

In [12]:
df1 = getReadStream(sc)

df2 = df1.selectExpr("split(value,',')[0] as Flow_Duration" \
                 ,"split(value,',')[1] as Flow_IAT_Mean" \
                 ,"split(value,',')[2] as FP" \
                ,"split(value,',')[3] as FB" \
                ,"split(value,',')[4] as Total_Length_of_Fwd_Packets" \
                ,"split(value,',')[5] as Label" \
                    )



df3 = df2.withColumn("Flow_Duration", df2["Flow_Duration"].cast(DoubleType()))\
        .withColumn("Flow_IAT_Mean", df2["Flow_IAT_Mean"].cast(DoubleType()))\
        .withColumn("FP", df2["FP"].cast(DoubleType()))\
        .withColumn("FB", df2["FB"].cast(DoubleType()))\
        .withColumn("Total_Length_of_Fwd_Packets", df2["Total_Length_of_Fwd_Packets"].cast(DoubleType()))\
        .withColumn("Label", df2["Label"].cast(StringType()))\



query = (df3.writeStream\
        .foreachBatch(foreach_batch_function)\
        .outputMode('update')\
        .trigger(processingTime='3 seconds')\
        .start())

query.awaitTermination()


22/03/05 06:42:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f3b07873-6e17-4c9e-911c-c3cd2acbf019. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/03/05 06:42:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


StringIndexerModel: uid=StringIndexer_bda6ba4029d6, handleInvalid=error
VectorAssembler_a03af1231b54


22/03/05 06:42:28 ERROR MicroBatchExecution: Query [id = 790a6314-2eea-40f2-8cb7-e6a650839820, runId = f0f7c691-b836-4f69-b3f0-9160aa730106] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/clientserver.py", line 581, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_51/511656058.py", line 9, in foreach_batch_function
    dataTemp = vecta.transform(df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 217, in transform
    return self._transform(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 350, in _transform
    return DataFrame(self._java_obj.tr

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/clientserver.py", line 581, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_51/511656058.py", line 9, in foreach_batch_function
    dataTemp = vecta.transform(df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 217, in transform
    return self._transform(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 350, in _transform
    return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
  File "/usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1309, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Data type string of column FP is not supported.
Data type string of column FB is not supported.

=== Streaming Query ===
Identifier: [id = 790a6314-2eea-40f2-8cb7-e6a650839820, runId = f0f7c691-b836-4f69-b3f0-9160aa730106]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[MalModel]]: {"MalModel":{"0":137}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [Flow_Duration#632, Flow_IAT_Mean#639, FP#622, FB#623, Total_Length_of_Fwd_Packets#646, cast(Label#625 as string) AS Label#653]
+- Project [Flow_Duration#632, Flow_IAT_Mean#639, FP#622, FB#623, cast(Total_Length_of_Fwd_Packets#624 as double) AS Total_Length_of_Fwd_Packets#646, Label#625]
   +- Project [Flow_Duration#632, cast(Flow_IAT_Mean#621 as double) AS Flow_IAT_Mean#639, FP#622, FB#623, Total_Length_of_Fwd_Packets#624, Label#625]
      +- Project [cast(Flow_Duration#620 as double) AS Flow_Duration#632, Flow_IAT_Mean#621, FP#622, FB#623, Total_Length_of_Fwd_Packets#624, Label#625]
         +- Project [split(value#612, ,, -1)[0] AS Flow_Duration#620, split(value#612, ,, -1)[1] AS Flow_IAT_Mean#621, split(value#612, ,, -1)[2] AS FP#622, split(value#612, ,, -1)[3] AS FB#623, split(value#612, ,, -1)[4] AS Total_Length_of_Fwd_Packets#624, split(value#612, ,, -1)[5] AS Label#625]
            +- Project [key#604, cast(value#591 as string) AS value#612, topic#592, partition#593, offset#594L, timestamp#595, timestampType#596]
               +- Project [cast(key#590 as string) AS key#604, value#591, topic#592, partition#593, offset#594L, timestamp#595, timestampType#596]
                  +- StreamingDataSourceV2Relation [key#590, value#591, topic#592, partition#593, offset#594L, timestamp#595, timestampType#596], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@b1a4883, KafkaV2[Subscribe[MalModel]]
