In [1]:
from pyspark.sql.types import *
from pyspark.ml import PipelineModel
from pyspark.sql import functions as fn

In [2]:
cvModel = PipelineModel.load('/TFG/result')

In [3]:
rf_TFG_schema = StructType([
 StructField("totalDestinationBytes",IntegerType(),False),
 StructField("totalDestinationPackets",IntegerType(),False),
 StructField("totalSourceBytes",IntegerType(),False),
 StructField("totalSourcePackets",IntegerType(),False),
 StructField("timeLength",IntegerType(),False),
 StructField("sourceByteRate",DoubleType(),False),
 StructField("destinationByteRate",DoubleType(),False),
 StructField("sourcePacketRate",DoubleType(),False),
 StructField("destinationPacketRate",DoubleType(),False),
 StructField("avgSourcePacketSize",DoubleType(),False),
 StructField("avgDestinationPacketSize",DoubleType(),False),
 StructField("totalDestinationBytesDiffMedianScal",DoubleType(),False),
 StructField("totalDestinationPacketsDiffMedianScal",DoubleType(),False),
 StructField("totalSourceBytesDiffMedianScal",DoubleType(),False),
 StructField("totalSourcePacketsDiffMedianScal",DoubleType(),False),
 StructField("timeLengthDiffMedianScal",DoubleType(),False),
 StructField("avgDestinationPacketSizeDiffMedianScal",DoubleType(),False),
 StructField("avgSourcePacketSizeDiffMedianScal",DoubleType(),False),
 StructField("destinationByteRateDiffMedianScal",DoubleType(),False),
 StructField("destinationPacketRateDiffMedianScal",DoubleType(),False),
 StructField("sourceByteRateDiffMedianScal",DoubleType(),False),
 StructField("sourcePacketRateDiffMedianScal",DoubleType(),False),
 StructField("protocolName_tcp_ip",IntegerType(),False),
 StructField("protocolName_udp_ip",IntegerType(),False),
 StructField("sourceTCPFlag_N/A",IntegerType(),False),
 StructField("sourceTCPFlag_S",IntegerType(),False),
 StructField("destinationResume_external",IntegerType(),False),
 StructField("destinationResume_mainServer",IntegerType(),False),
 StructField("Tag",StringType(),False)
])

In [4]:
dataset = (spark.read.schema(rf_TFG_schema).option("header", "true").csv('/TFG/perf'))

In [5]:
display(dataset.groupBy(dataset.Tag).count())

In [6]:
spark.conf.set("spark.sql.shuffle.partitions", "1")

In [7]:
dataset = (spark.readStream.schema(rf_TFG_schema).option("maxFilesPerTrigger", 1).option("header", "true").csv('/TFG/perf'))

In [8]:
predictions = cvModel.transform(dataset).groupBy('label').agg((fn.sum(fn.when(fn.col('prediction') == fn.col('label'), 1)) / fn.count('label')).alias('true prediction'), fn.count('label').alias('count')).withColumn('timestamp', fn.lit(fn.current_timestamp()))
display(predictions)

In [9]:
q1 = dataset.writeStream.format('memory').trigger(processingTime='5 seconds').queryName('dataset').outputMode('append').start()

In [10]:
q2 = cvModel.transform(dataset).writeStream.format('memory').trigger(processingTime='5 seconds').queryName('predictions').outputMode('append').start()

In [11]:
q1.stop()