In [1]:
!pip install neo4j

Collecting neo4j
  Downloading neo4j-4.4.1.tar.gz (89 kB)
[K     |████████████████████████████████| 89 kB 3.1 MB/s eta 0:00:011
Building wheels for collected packages: neo4j
  Building wheel for neo4j (setup.py) ... [?25ldone
[?25h  Created wheel for neo4j: filename=neo4j-4.4.1-py3-none-any.whl size=114759 sha256=3481cf45e797f450dd94034b25ce4fcae5b92a00bf983e94cc28078dfd56357a
  Stored in directory: /home/jovyan/.cache/pip/wheels/1a/38/4b/0876d24f853fdfe40b2440c8c03332ec2d7f1f88b2446dc694
Successfully built neo4j
Installing collected packages: neo4j
Successfully installed neo4j-4.4.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import from_json
from pyspark.sql.functions import from_csv
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from neo4j import GraphDatabase

In [3]:
# Spark session & context
def init():
    spark = (SparkSession
         .builder
         .master('local')
         .appName('AndMalware-consumer')
         # Add kafka package  
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")\
         .getOrCreate())
    return spark

In [4]:
sc = init()
# Create stream dataframe setting kafka server, topic and offset option
def getReadStream(spark):
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka-server:9092") \
      .option("startingOffsets", "earliest") \
      .option("subscribe", "AndMalwer") \
      .load()
    
    #df.selectExpr("CAST(value AS STRING)", "timestamp")
    dff = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))
    return dff


In [5]:
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    df.write\
    .format("org.neo4j.spark.DataSource")\
    .mode("Append")\
    .option("url", "bolt://neo4j:7687")\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "neo")\
    .option("query", "CREATE (n:Malware {DIP: event.Destination_IP,maxFD: event.MaxFD,minFD: event.MinFD,sumFD: event.SumFD,meanFD: event.MeanFD,stddevFD: event.StddevFD,maxTFWD: event.MaxTFWD,minTFWD: event.MinTFWD,sumTFWD: event.SumTFWD,meanTFWD: event.MeanTFWD,stddevTFWD: event.StddevTFWD,maxTBWD: event.MaxTBWD,minTBWD: event.MinTBWD,sumTBWD: event.SumTBWD,meanTBWD: event.MeanTBWD,stddevTBWD: event.StddevTBWD,maxFB: event.MaxFB,minFB: event.MinFB,sumFB: event.SumFB,meanFB: event.MeanFB,stddevFB: event.StddevFB,maxFP: event.MaxFB,minFP: event.MinFP,sumFP: event.SumFP,meanFP: event.MeanFP,stddevFP: event.StddevFP})")\
    .save()
    pass

In [None]:
#Geting readsteram
df1 = getReadStream(sc)

#Parsing datta from value string.

df2 = df1.selectExpr("split(value,',')[0] as Source_IP" \
                 ,"split(value,',')[1] as Source_Port" \
                 ,"split(value,',')[2] as Destination_IP" \
                ,"split(value,',')[3] as Destination_Port" \
                ,"split(value,',')[4] as Timestamp" \
                ,"split(value,',')[5] as Flow_Duration" \
                ,"split(value,',')[6] as Total_Fwd_Packets" \
                ,"split(value,',')[7] as Total_Bwd_Packets" \
                ,"split(value,',')[8] as Total_Length_of_Fwd_Packets" \
                ,"split(value,',')[9] as Total_Length_of_Bwd_Packets" \
                ,"split(value,',')[10] as Flow_Bytess" \
                ,"split(value,',')[11] as Flow_Packetss" \
                    )


#Formating data.

df3 = df2.withColumn("Source_Port", df2["Source_Port"].cast(IntegerType()))\
        .withColumn("Destination_Port", df2["Destination_Port"].cast(IntegerType()))\
        .withColumn("Destination_IP", df2["Destination_IP"].alias("DIP"))\
        .withColumn("Timestamp", df2["Timestamp"].cast(TimestampType()))\
        .withColumn("Flow_Duration", df2["Flow_Duration"].cast(IntegerType()))\
        .withColumn("Total_Fwd_Packets", df2["Total_Fwd_Packets"].cast(IntegerType()))\
        .withColumn("Total_Bwd_Packets", df2["Total_Bwd_Packets"].cast(IntegerType()))\
        .withColumn("Total_Length_of_Fwd_Packets", df2["Total_Length_of_Fwd_Packets"].cast(IntegerType()))\
        .withColumn("Total_Length_of_Bwd_Packets", df2["Total_Length_of_Bwd_Packets"].cast(IntegerType()))\
        .withColumn("Flow_Bytess", df2["Flow_Bytess"].cast(IntegerType()))\
        .withColumn("Flow_Packetss", df2["Flow_Packetss"].cast(DoubleType()))

#Creating a window duration 10 minutes.

wind = window(df3.Timestamp,"10 minutes")

wdf = df3.groupBy(wind,col("Destination_IP")).agg(sum(col("Flow_Duration")).alias("SumFD")\
                                                         ,max(col("Flow_Duration")).alias("MaxFD")\
                                                         ,min(col("Flow_Duration")).alias("MinFD")\
                                                         ,mean(col("Flow_Duration")).alias("MeanFD")\
                                                         ,stddev(col("Flow_Duration")).alias("StddevFD")\
                                                         ,sum(col("Total_Fwd_Packets")).alias("SumTFWD")\
                                                         ,max(col("Total_Fwd_Packets")).alias("MaxTFWD")\
                                                         ,min(col("Total_Fwd_Packets")).alias("MinTFWD")\
                                                         ,mean(col("Total_Fwd_Packets")).alias("MeanTFWD")\
                                                         ,stddev(col("Total_Fwd_Packets")).alias("StddevTFWD")
                                                         ,sum(col("Total_Bwd_Packets")).alias("SumTBWD")\
                                                         ,max(col("Total_Bwd_Packets")).alias("MaxTBWD")\
                                                         ,min(col("Total_Bwd_Packets")).alias("MinTBWD")\
                                                         ,mean(col("Total_Bwd_Packets")).alias("MeanTBWD")\
                                                         ,stddev(col("Total_Bwd_Packets")).alias("StddevTBWD")
                                                         ,sum(col("Flow_Bytess")).alias("SumFB")\
                                                         ,max(col("Flow_Bytess")).alias("MaxFB")\
                                                         ,min(col("Flow_Bytess")).alias("MinFB")\
                                                         ,mean(col("Flow_Bytess")).alias("MeanFB")\
                                                         ,stddev(col("Flow_Bytess")).alias("StddevFB")
                                                         ,sum(col("Flow_Packetss")).alias("SumFP")\
                                                         ,max(col("Flow_Packetss")).alias("MaxFP")\
                                                         ,min(col("Flow_Packetss")).alias("MinFP")\
                                                         ,mean(col("Flow_Packetss")).alias("MeanFP")\
                                                         ,stddev(col("Flow_Packetss")).alias("StddevFP"))\
                                                        .where(col("Destination_IP") ==" 10.42.0.151")

#Write stream.
query = (wdf.writeStream\
        .foreachBatch(foreach_batch_function)\
        .outputMode('update')\
        .trigger(processingTime='3 seconds')\
        .start())

query.awaitTermination()
