# Darknet pattern detection

## 1. Pattern detection
### 1.1 Define session and libraries

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/CDH-6.1.1-1.cdh6.1.1.p0.875250/lib/spark/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
os.environ['PYSPARK_SUBMIT_ARGS'] = "pyspark-shell"

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark import sql
from pyspark.sql import Row
from pyspark.sql import SQLContext, HiveContext, DataFrameWriter
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, sum
from pyspark.sql import SparkSession
from datetime import datetime
import subprocess
import math
import time
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, udf
from operator import add
from functools import reduce
from pyspark.sql.functions import pandas_udf, PandasUDFType, split
import numpy as np
from scipy.stats import entropy
import pandas as pd
from pyspark.sql.functions import lit
import matplotlib.pyplot as plt

# described here https://medium.com/@achilleus/spark-session-10d0d66d1d24
print("starting")
spark = SparkSession.builder\
                    .appName("darknet_feature_extraction_without_hive")\
                    .master("yarn")\
                    .config("spark.submit.deployMode","client")\
                    .config("spark.executor.memory","20g")\
                    .config("spark.driver.memory","20g")\
                    .config('spark.sql.autoBroadcastJoinThreshold','-1')\
                    .enableHiveSupport()\
                    .getOrCreate()

print("done with startup")

starting
done with startup


### 1.2 Define the location and structure of files

We previosly splitted the traces to 1 hour windows, extracted fields and uploaded them to hdfs.
I tried to use window function to split the data in hdfs but it was taking more time than doing it locally on a server.

In [2]:
darknet_pattern_schema = StructType([StructField('frame_time',StringType(),True)\
                                     , StructField('frame_len',StringType(),True)\
                                     , StructField('ip_proto',StringType(),True)\
                                     , StructField('ip_len',StringType(),True)\
                                     , StructField('ip_ttl',StringType(),True)\
                                     , StructField('ip_version',StringType(),True)\
                                     , StructField('ip_flags',StringType(),True)\
                                     , StructField('ip_flags_mf',StringType(),True)\
                                     , StructField('ip_frag_offset',IntegerType(),True)\
                                     , StructField('ip_src',StringType(),True)\
                                     , StructField('ip_dst',StringType(),True)\
                                     , StructField('icmp_type',StringType(),True)\
                                     , StructField('icmp_code',StringType(),True)\
                                     , StructField('tcp_dstport',StringType(),True)\
                                     , StructField('tcp_srcport',StringType(),True)\
                                     , StructField('tcp_flags',StringType(),True)\
                                     , StructField('tcp_flags_ack',StringType(),True)\
                                     , StructField('tcp_flags_cwr',StringType(),True)\
                                     , StructField('tcp_flags_fin',StringType(),True)\
                                     , StructField('tcp_flags_ecn',StringType(),True)\
                                     , StructField('tcp_flags_ns',StringType(),True)\
                                     , StructField('tcp_flags_push',StringType(),True)\
                                     , StructField('tcp_flags_syn',StringType(),True)\
                                     , StructField('tcp_flags_urg',StringType(),True)\
                                     , StructField('tcp_flags_reset',StringType(),True)\
                                     , StructField('tcp_len',StringType(),True)\
                                     , StructField('tcp_window_size',StringType(),True)\
                                     , StructField('udp_srcport',StringType(),True)\
                                     , StructField('udp_dstport',StringType(),True)])

hdfs_results_dir = '/user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/'
# this is a hdfs directory where we stored 1 hours splits
onehour_path = '/user/mulinpav/NII_internship/datasets/darknet/0*2019/extracted_1hour'
cmd = 'hdfs dfs -ls ' + onehour_path + " | sed '1d;s/  */ /g'| cut -d\  -f8"

files = subprocess.check_output(cmd, shell=True).decode().strip().split('\n')
# this filters out empty strings
files = list(filter(None, files))

## 1.3 Detect predefined patterns and write results in predefined hdfs directory structure

Script in notebook is run only for 1 file and the output is not saved to hdfs. Run the script on background and uncomment lines that include ".write" to get the reuslts.

directory structure:

    (base) mulinpav@deep0:~/NII_internship/scripts/big-dama$ hdfs dfs -ls /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour
    Found 16 items
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/events
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/net_scan_icmp_heavy_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/net_scan_icmp_light_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/net_scan_tcp_heavy_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/net_scan_tcp_light_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/net_scan_udp_heavy_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/net_scan_udp_light_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/one_flow_tcp_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-06-16 13:47 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/one_flow_udp_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/port_scan_tcp_heavy_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/port_scan_tcp_light_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/port_scan_udp_heavy_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/port_scan_udp_light_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/small_ping_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/small_syn_times
    drwxr-xr-x   - mulinpav mulinpav          0 2019-07-03 23:13 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/small_udp_times
    
each directory contains subdirectories with the split name:

    (base) mulinpav@deep0:~/NII_internship/scripts/big-dama$ hdfs dfs -ls /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/* | head -4
    Found 1022 items
    drwxr-xr-x   - mulinpav mulinpav          0 2019-06-30 20:21 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/events/20190106000001_csv
    drwxr-xr-x   - mulinpav mulinpav          0 2019-06-30 20:26 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/events/20190106010001_csv
    drwxr-xr-x   - mulinpav mulinpav          0 2019-06-30 20:31 /user/mulinpav/NII_internship/datasets/darknet/time_interval_pattern_analysis/1hour/events/20190106020001_csv

Spark works best when it writes output in smaller files/chunks, otherwise it has to gather results to 1 node which slowers it done. Consider results in derecotry '..../events/20190106000001_csv' as '..../events/20190106000001.csv'

In [3]:
#  udfs to help out with identification of some of the attacks
def port_scan_tcp_flags_func(x):
    # SYN, FIN, FINACK, NULL
    flag_list = ['0x00000002','0x00000001','0x00000011','']
    if x in flag_list:
        return 1
    else:
        return 0

def backscatter_tcp_flags_func(x):
    #SYNACK, ACK, RST, RSTACK    
    flag_list = ['0x00000012','0x00000010','0x00000004','0x00000014']
    if x in flag_list:
        return 1
    else:
        return 0
    
def backscatter_udp_ports_func(x):
    #DNS, NTP, NetBIOS, SNMP
    port_list = ['53','123','137','161']
    if x in port_list:
        return 1
    else:
        return 0
    
#max_udf = udf(lambda x: np.isin(x, ['0','2','16','18']), StringType())
port_scan_tcp_flags_udf = udf(port_scan_tcp_flags_func,IntegerType())
backscatter_tcp_flags_udf = udf(backscatter_tcp_flags_func,IntegerType())
backscatter_udp_ports_udf = udf(backscatter_udp_ports_func,IntegerType())

# threshold values from Darknet paper
N1 = 5
N2 = 5
N3 = 15
M = 3
R = 0.5

for hdfs_file in files[:1]:
    file_name = hdfs_file.split('/')[-1]
    datadf = spark.read.option("sep", "\t").schema(darknet_pattern_schema).csv(hdfs_file)
    # 1. frame_time_ux - timestamps are in format "Feb 28, 2019 00:00:02.023742000 JST" - I will convert them to UX timestamp + add ms substring, I ignore the JST as I will be calculating time intervals
    # - unix time stamps are used to easily compute time intervals of the attacks
    datadf = datadf.withColumn('frame_time_ux', F.unix_timestamp("frame_time", "MMM dd,yyyy HH:mm:ss") + F.substring("frame_time", -13, 9).cast('float')/1000000000)\
                   .drop("frame_time")\
                   .fillna(0)
    
    # portscan
    # I added "(F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range')"
    # and port_scan_tcp_heavy_times
    port_scan_tcp_heavy = datadf.filter(F.col('ip_proto')=='6')\
                          .withColumn('port_scan_tcp_flags',port_scan_tcp_flags_udf('tcp_flags'))\
                          .groupby(['ip_src','ip_dst'])\
                          .agg(F.sum('port_scan_tcp_flags'), F.countDistinct('tcp_dstport').alias('distinct_tcp_dstport'), F.count(F.lit(1)).alias('#pkts_ipdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                          .withColumn('port_scan_tcp_flags_frac', F.col('#pkts_ipdst')/F.col('sum(port_scan_tcp_flags)'))\
                          .withColumn('avg_#pkts_tcpdst', F.col('#pkts_ipdst')/F.col('distinct_tcp_dstport'))\
                          .filter((F.col('distinct_tcp_dstport')>=N2) & (F.col('port_scan_tcp_flags_frac')>=R) & (F.col('avg_#pkts_tcpdst')>M))

    port_scan_tcp_heavy_times = port_scan_tcp_heavy.select(['event_time_range'])\
                                                   .withColumn('file_name', F.lit(file_name))
    #port_scan_tcp_heavy_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'port_scan_tcp_heavy_times/'+ file_name +'_csv', mode='append', sep=',')

    port_scan_tcp_heavy = port_scan_tcp_heavy.withColumn('port_scan_tcp', lit(1))\
                                             .groupby(['ip_src']).agg(F.sum('port_scan_tcp').alias('port_scan_tcp_heavy'))
    

    port_scan_tcp_light = datadf.filter(F.col('ip_proto')=='6')\
                          .withColumn('port_scan_tcp_flags',port_scan_tcp_flags_udf('tcp_flags'))\
                          .groupby(['ip_src','ip_dst'])\
                          .agg(F.sum('port_scan_tcp_flags'), F.countDistinct('tcp_dstport').alias('distinct_tcp_dstport'), F.count(F.lit(1)).alias('#pkts_ipdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                          .withColumn('port_scan_tcp_flags_frac', F.col('#pkts_ipdst')/F.col('sum(port_scan_tcp_flags)'))\
                          .withColumn('avg_#pkts_tcpdst', F.col('#pkts_ipdst')/F.col('distinct_tcp_dstport'))\
                          .filter((F.col('distinct_tcp_dstport')>=N2) & (F.col('port_scan_tcp_flags_frac')>=R) & (F.col('avg_#pkts_tcpdst')<=M))

    port_scan_tcp_light_times = port_scan_tcp_light.select(['event_time_range'])\
                                                   .withColumn('file_name', F.lit(file_name))
    #port_scan_tcp_light_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'port_scan_tcp_light_times/'+ file_name +'_csv', mode='append', sep=',')

    port_scan_tcp_light = port_scan_tcp_light.withColumn('port_scan_tcp', lit(1))\
                                             .groupby(['ip_src']).agg(F.sum('port_scan_tcp').alias('port_scan_tcp_light'))


    port_scan_udp_heavy = datadf.filter(F.col('ip_proto')=='17')\
                          .groupby(['ip_src','ip_dst'])\
                          .agg(F.countDistinct('udp_dstport').alias('distinct_udp_dstport'), F.count(F.lit(1)).alias('#pkts_ipdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                          .withColumn('avg_#pkts_udpdst', F.col('#pkts_ipdst')/F.col('distinct_udp_dstport'))\
                          .filter((F.col('distinct_udp_dstport')>=N2) & (F.col('avg_#pkts_udpdst')>M))

    port_scan_udp_heavy_times = port_scan_udp_heavy.select(['event_time_range'])\
                                                   .withColumn('file_name', F.lit(file_name))
    #port_scan_udp_heavy_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'port_scan_udp_heavy_times/'+ file_name +'_csv', mode='append', sep=',')

    port_scan_udp_heavy = port_scan_udp_heavy.withColumn('port_scan_udp', lit(1))\
                                             .groupby(['ip_src']).agg(F.sum('port_scan_udp').alias('port_scan_udp_heavy'))


    port_scan_udp_light = datadf.filter(F.col('ip_proto')=='17')\
                          .groupby(['ip_src','ip_dst'])\
                          .agg(F.countDistinct('udp_dstport').alias('distinct_udp_dstport'), F.count(F.lit(1)).alias('#pkts_ipdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                          .withColumn('avg_#pkts_udpdst', F.col('#pkts_ipdst')/F.col('distinct_udp_dstport'))\
                          .filter((F.col('distinct_udp_dstport')>=N2) & (F.col('avg_#pkts_udpdst')<=M))

    port_scan_udp_light_times = port_scan_udp_light.select(['event_time_range'])\
                                                   .withColumn('file_name', F.lit(file_name))
    #port_scan_udp_light_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'port_scan_udp_light_times/'+ file_name +'_csv', mode='append', sep=',')

    port_scan_udp_light = port_scan_udp_light.withColumn('port_scan_udp', lit(1))\
                                             .groupby(['ip_src']).agg(F.sum('port_scan_udp').alias('port_scan_udp_light'))


    # netscan
    net_scan_tcp_heavy = datadf.filter(F.col('ip_proto')=='6')\
                         .withColumn('port_scan_tcp_flags',port_scan_tcp_flags_udf('tcp_flags'))\
                         .groupby(['ip_src','tcp_dstport'])\
                         .agg(F.sum('port_scan_tcp_flags'), F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts_tcpdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .withColumn('port_scan_tcp_flags_frac', F.col('#pkts_tcpdst')/F.col('sum(port_scan_tcp_flags)'))\
                         .withColumn('avg_#pkts_ipdst', F.col('#pkts_tcpdst')/F.col('distinct_ip_dst'))\
                         .filter((F.col('distinct_ip_dst')>=N1) & (F.col('port_scan_tcp_flags_frac')>=R) & (F.col('avg_#pkts_ipdst')>M))

    net_scan_tcp_heavy_times = net_scan_tcp_heavy.select(['event_time_range'])\
                                                 .withColumn('file_name', F.lit(file_name))
    #net_scan_tcp_heavy_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'net_scan_tcp_heavy_times/'+ file_name +'_csv', mode='append', sep=',')

    net_scan_tcp_heavy = net_scan_tcp_heavy.withColumn('net_scan_tcp', lit(1))\
                                           .groupby(['ip_src']).agg(F.sum('net_scan_tcp').alias('net_scan_tcp_heavy'))

    net_scan_tcp_light = datadf.filter(F.col('ip_proto')=='6')\
                         .withColumn('port_scan_tcp_flags',port_scan_tcp_flags_udf('tcp_flags'))\
                         .groupby(['ip_src','tcp_dstport'])\
                         .agg(F.sum('port_scan_tcp_flags'), F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts_tcpdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .withColumn('port_scan_tcp_flags_frac', F.col('#pkts_tcpdst')/F.col('sum(port_scan_tcp_flags)'))\
                         .withColumn('avg_#pkts_ipdst', F.col('#pkts_tcpdst')/F.col('distinct_ip_dst'))\
                         .filter((F.col('distinct_ip_dst')>=N1) & (F.col('port_scan_tcp_flags_frac')>=R) & (F.col('avg_#pkts_ipdst')<=M))\

    net_scan_tcp_light_times = net_scan_tcp_light.select(['event_time_range'])\
                                                 .withColumn('file_name', F.lit(file_name))
    #net_scan_tcp_light_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'net_scan_tcp_light_times/'+ file_name +'_csv', mode='append', sep=',')

    net_scan_tcp_light = net_scan_tcp_light.withColumn('net_scan_tcp', lit(1))\
                                           .groupby(['ip_src']).agg(F.sum('net_scan_tcp').alias('net_scan_tcp_light'))

    net_scan_udp_heavy = datadf.filter(F.col('ip_proto')=='17')\
                         .groupby(['ip_src','udp_dstport'])\
                         .agg( F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts_udpdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .withColumn('avg_#pkts_ipdst', F.col('#pkts_udpdst')/F.col('distinct_ip_dst'))\
                         .filter((F.col('distinct_ip_dst')>=N1) & (F.col('avg_#pkts_ipdst')>M))

    net_scan_udp_heavy_times = net_scan_udp_heavy.select(['event_time_range'])\
                                                 .withColumn('file_name', F.lit(file_name))
    #net_scan_udp_heavy_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'net_scan_udp_heavy_times/'+ file_name +'_csv', mode='append', sep=',')

    net_scan_udp_heavy = net_scan_udp_heavy.withColumn('net_scan_udp', lit(1))\
                                           .groupby(['ip_src']).agg(F.sum('net_scan_udp').alias('net_scan_udp_heavy'))

    net_scan_udp_light = datadf.filter(F.col('ip_proto')=='17')\
                         .groupby(['ip_src','udp_dstport'])\
                         .agg( F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts_udpdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .withColumn('avg_#pkts_ipdst', F.col('#pkts_udpdst')/F.col('distinct_ip_dst'))\
                         .filter((F.col('distinct_ip_dst')>=N1) & (F.col('avg_#pkts_ipdst')<=M))

    net_scan_udp_light_times = net_scan_udp_light.select(['event_time_range'])\
                                                 .withColumn('file_name', F.lit(file_name))
    #net_scan_udp_light_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'net_scan_udp_light_times/'+ file_name +'_csv', mode='append', sep=',')

    net_scan_udp_light = net_scan_udp_light.withColumn('net_scan_udp', lit(1))\
                                           .groupby(['ip_src']).agg(F.sum('net_scan_udp').alias('net_scan_udp_light'))


    net_scan_icmp_heavy = datadf.filter((F.col('ip_proto')=='1') & (F.col('icmp_type')=='8') & (F.col('icmp_code')=='0'))\
                         .groupby(['ip_src','ip_proto'])\
                         .agg( F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts_icmpdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .withColumn('avg_#pkts_icmpdst', F.col('#pkts_icmpdst')/F.col('distinct_ip_dst'))\
                         .filter((F.col('distinct_ip_dst')>=N1) & (F.col('avg_#pkts_icmpdst')>M))

    net_scan_icmp_heavy_times = net_scan_icmp_heavy.select(['event_time_range'])\
                                                   .withColumn('file_name', F.lit(file_name))
    #net_scan_icmp_heavy_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'net_scan_icmp_heavy_times/'+ file_name +'_csv', mode='append', sep=',')

    net_scan_icmp_heavy = net_scan_icmp_heavy.withColumn('net_scan_icmp_heavy', lit(1))\
                                           .select(['ip_src','net_scan_icmp_heavy'])

    net_scan_icmp_light = datadf.filter((F.col('ip_proto')=='1') & (F.col('icmp_type')=='8') & (F.col('icmp_code')=='0'))\
                         .groupby(['ip_src','ip_proto'])\
                         .agg( F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts_icmpdst'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .withColumn('avg_#pkts_icmpdst', F.col('#pkts_icmpdst')/F.col('distinct_ip_dst'))\
                         .filter((F.col('distinct_ip_dst')>=N1) & (F.col('avg_#pkts_icmpdst')<=M))

    net_scan_icmp_light_times = net_scan_icmp_light.select(['event_time_range'])\
                                                   .withColumn('file_name', F.lit(file_name))    
    #net_scan_icmp_light_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'net_scan_icmp_light_times/'+ file_name +'_csv', mode='append', sep=',')

    net_scan_icmp_light = net_scan_icmp_light.withColumn('net_scan_icmp_light', lit(1))\
                                           .select(['ip_src','net_scan_icmp_light'])

    # oneflow
    one_flow_tcp = datadf.filter(F.col('ip_proto')=='6')\
                         .groupby(['ip_src','ip_dst','tcp_dstport'])\
                         .agg(F.count(F.lit(1)).alias('#pkts'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .filter(F.col('#pkts')>N3)\
                         .withColumn('one_flow_tcp', lit(1))
    one_flow_tcp_times = one_flow_tcp.select(['event_time_range'])\
                                     .withColumn('file_name', F.lit(file_name))

    #one_flow_tcp_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'one_flow_tcp_times/'+ file_name +'_csv', mode='append', sep=',')
    one_flow_tcp = one_flow_tcp.select(['ip_src','one_flow_tcp'])\
                               .groupby(['ip_src']).agg(F.sum('one_flow_tcp').alias('one_flow_tcp'))

    one_flow_udp = datadf.filter(F.col('ip_proto')=='17')\
                         .groupby(['ip_src','ip_dst','udp_dstport'])\
                         .agg(F.count(F.lit(1)).alias('#pkts'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                         .filter(F.col('#pkts')>N3)\
                         .withColumn('one_flow_udp', lit(1))
    one_flow_udp_times = one_flow_udp.select(['event_time_range'])\
                                     .withColumn('file_name', F.lit(file_name))
    #one_flow_udp_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'one_flow_udp_times'+ file_name +'_csv', mode='append', sep=',')
    one_flow_udp = one_flow_udp.select(['ip_src','one_flow_udp'])\
                               .groupby(['ip_src']).agg(F.sum('one_flow_udp').alias('one_flow_udp'))

    # backscatter
    # I did not include time range checking here as its at least 1 packet
    # check if tcp_flags in hexadecimal corresponds to separately extracted flags
    backscatter_tcp = datadf.filter(F.col('ip_proto')=='6')\
                            .withColumn('backscatter_tcp_flags',backscatter_tcp_flags_udf('tcp_flags'))\
                            .filter(F.col('backscatter_tcp_flags')==1)\
                            .groupby(['ip_src'])\
                            .agg(F.count(F.lit(1)).alias('#pkts'))\
                            .filter(F.col('#pkts')>1)\
                            .withColumn('backscatter_tcp', lit(1))\
                            .select(['ip_src','backscatter_tcp'])

    backscatter_udp = datadf.filter(F.col('ip_proto')=='17')\
                            .withColumn('backscatter_udp_ports',backscatter_udp_ports_udf('udp_srcport'))\
                            .filter(F.col('backscatter_udp_ports')==1)\
                            .groupby(['ip_src'])\
                            .agg(F.count(F.lit(1)).alias('#pkts'))\
                            .filter(F.col('#pkts')>1)\
                            .withColumn('backscatter_udp', lit(1))\
                            .select(['ip_src','backscatter_udp'])                        

    backscatter_icmp = datadf.filter((F.col('ip_proto')=='1') & (((F.col('icmp_type')=='0') & (F.col('icmp_code')=='0')) | (F.col('icmp_type')=='3') | ((F.col('icmp_type')=='11') & (F.col('icmp_code')=='0'))))\
                            .groupby(['ip_src'])\
                            .agg(F.count(F.lit(1)).alias('#pkts'))\
                            .filter(F.col('#pkts')>1)\
                            .withColumn('backscatter_icmp', lit(1))\
                            .select(['ip_src','backscatter_icmp'])                                                

    # fragmentation ip.flags == 0x01 or ip.frag_offset > 0
    # https:/osqa-ask.wireshark.org/questions/41152/how-to-check-if-fragmentation-is-happening
    # cast(ip_flags as string) as ip_flags,\
    # cast(ip_frag_offset as integer) as ip_frag_offset,\
    # I did not include time range checking here as its at least 1 packet
    ip_fragment = datadf.filter((F.col('ip_flags_mf')=='1') | (F.col('ip_frag_offset')>0))\
                        .groupby(['ip_src'])\
                        .agg(F.count(F.lit(1)).alias('#pkts'))\
                        .filter(F.col('#pkts')>0)\
                        .withColumn('ip_fragment', lit(1))\
                        .select(['ip_src','ip_fragment'])

    # smallSYN, smallUDP, smallPING
    small_syn = datadf.filter(F.col('tcp_flags')=='0x00000002')\
                      .groupby(['ip_src'])\
                      .agg(F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.countDistinct('tcp_dstport').alias('distinct_tcp_dstports'), F.count(F.lit(1)).alias('#pkts'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                      .filter((F.col('distinct_ip_dst')<N1) & (F.col('distinct_tcp_dstports')<N2) & (F.col('#pkts')<=N3))\
                      .withColumn('small_syn', lit(1))

    small_syn_times = small_syn.select(['event_time_range'])\
                               .withColumn('file_name', F.lit(file_name))
    #small_syn_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'small_syn_times/'+ file_name +'_csv', mode='append', sep=',')
    small_syn = small_syn.select(['ip_src','small_syn'])

    small_udp = datadf.filter(F.col('ip_proto')=='17')\
                      .groupby(['ip_src'])\
                      .agg(F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.countDistinct('udp_dstport').alias('distinct_udp_dstports'), F.count(F.lit(1)).alias('#pkts'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                      .filter((F.col('distinct_ip_dst')<N1) & (F.col('distinct_udp_dstports')<N2) & (F.col('#pkts')<=N3))\
                      .withColumn('small_udp', lit(1))
    small_udp_times = small_udp.select(['event_time_range'])\
                               .withColumn('file_name', F.lit(file_name))
    #small_udp_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'small_udp_times/'+ file_name +'_csv', mode='append', sep=',')
    small_udp = small_udp.select(['ip_src','small_udp'])

    small_ping = datadf.filter((F.col('ip_proto')=='1') & (F.col('icmp_type')=='8') & (F.col('icmp_code')=='0'))\
                       .groupby(['ip_src'])\
                       .agg(F.countDistinct('ip_dst').alias('distinct_ip_dst'), F.count(F.lit(1)).alias('#pkts'), (F.max('frame_time_ux')-F.min('frame_time_ux')).alias('event_time_range'))\
                       .filter((F.col('distinct_ip_dst')<N1) & (F.col('#pkts')<=N3))\
                       .withColumn('small_ping', lit(1))
    small_ping_times = small_ping.select(['event_time_range'])\
                                 .withColumn('file_name', F.lit(file_name))
    #small_ping_times.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'small_ping_times/'+ file_name +'_csv', mode='append', sep=',')
    small_ping = small_ping.select(['ip_src','small_ping']) 

    # according to paper "An Evaluation of Darknet traffic Taxonomy" following traffic may overlap with "other traffic":
    # ['backscatter_tcp', 'backscatter_udp', 'backscatter_icmp', 'ip_fragment']
    # so we summ all ip_src,traffic pattern tuples except these 4 and mark them with ['other_tcp', 'other_udp', 'other_icmp','other']
    non_other_list = ['port_scan_tcp_heavy', 'port_scan_tcp_light', 'port_scan_udp_heavy', 'port_scan_udp_light', 'net_scan_tcp_heavy', 'net_scan_tcp_light', 'net_scan_udp_heavy', 'net_scan_udp_light', 'net_scan_icmp_heavy', 'net_scan_icmp_light', 'one_flow_tcp', 'one_flow_udp', 'small_syn', 'small_udp', 'small_ping']

    result = datadf.select(['ip_src']).distinct()\
                        .join(port_scan_tcp_heavy, 'ip_src',how='left')\
                        .join(port_scan_tcp_light, 'ip_src',how='left')\
                        .join(port_scan_udp_heavy, 'ip_src',how='left')\
                        .join(port_scan_udp_light, 'ip_src',how='left')\
                        .join(net_scan_tcp_heavy, 'ip_src',how='left')\
                        .join(net_scan_tcp_light, 'ip_src',how='left')\
                        .join(net_scan_udp_heavy, 'ip_src',how='left')\
                        .join(net_scan_udp_light, 'ip_src',how='left')\
                        .join(net_scan_icmp_heavy, 'ip_src',how='left')\
                        .join(net_scan_icmp_light, 'ip_src',how='left')\
                        .join(one_flow_tcp, 'ip_src',how='left')\
                        .join(one_flow_udp, 'ip_src',how='left')\
                        .join(backscatter_tcp, 'ip_src',how='left')\
                        .join(backscatter_udp, 'ip_src',how='left')\
                        .join(backscatter_icmp, 'ip_src',how='left')\
                        .join(ip_fragment, 'ip_src',how='left')\
                        .join(small_syn, 'ip_src',how='left')\
                        .join(small_udp, 'ip_src',how='left')\
                        .join(small_ping, 'ip_src',how='left')\
                        .fillna(0)\
                        .withColumn('other_temp' ,reduce(add, [F.col(x) for x in non_other_list]))
    #other = result_temp.filter
    # takes the remaining 'other ip_src' joins them with ip_src of tcp,udp,icmp, sums their value and if the sum is still 0 then it will put 1 into 'other' column
    other_tcp = datadf.filter(F.col('ip_proto')=='6').select('ip_src').distinct().withColumn('other_tcp', lit(1))
    other_udp = datadf.filter(F.col('ip_proto')=='17').select('ip_src').distinct().withColumn('other_udp', lit(1))
    other_icmp = datadf.filter(F.col('ip_proto')=='1').select('ip_src').distinct().withColumn('other_icmp', lit(1))
    other_temp = result.filter(F.col('other_temp')==0).select(['ip_src'])\
                       .join(other_tcp,'ip_src',how='left')\
                       .join(other_udp,'ip_src',how='left')\
                       .join(other_icmp,'ip_src',how='left')\
                       .fillna(0)\
                       .withColumn('other' ,reduce(add, [F.col(x) for x in ['other_tcp', 'other_udp', 'other_icmp']]))\
                       .withColumn('other', F.when(F.col('other') > 0, 0).otherwise(1))

    result = result.join(other_temp,'ip_src',how='left').drop('other_temp').fillna(0)\
                   .withColumn('file_name', F.lit(file_name))
    #result.write.format("com.databricks.spark.csv").option('header', 'true').save(path=hdfs_results_dir+ 'events/'+ file_name +'_csv', mode='append', sep=',')

## 1.4 Example outputs
### 1.4.1 Matrix of detected anolies/patterns in 1 split

In [4]:
result.show()

+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------+------------+---------------+---------------+----------------+-----------+---------+---------+----------+---------+---------+----------+-----+--------------+
|              ip_src|port_scan_tcp_heavy|port_scan_tcp_light|port_scan_udp_heavy|port_scan_udp_light|net_scan_tcp_heavy|net_scan_tcp_light|net_scan_udp_heavy|net_scan_udp_light|net_scan_icmp_heavy|net_scan_icmp_light|one_flow_tcp|one_flow_udp|backscatter_tcp|backscatter_udp|backscatter_icmp|ip_fragment|small_syn|small_udp|small_ping|other_tcp|other_udp|other_icmp|other|     file_name|
+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----

### 1.4.2 Time intervals of all pattern types - for further time analysis of the patterns (backscatter tcp/udp/icmp and IP fragments are not analysed as they are "1 packet patterns")

In [5]:
port_scan_tcp_heavy_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
|1330.7212989330292|20190106000001|
+------------------+--------------+



In [6]:
port_scan_tcp_light_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 2281.438353061676|20190106000001|
|3254.2540678977966|20190106000001|
| 3431.640604019165|20190106000001|
|2712.7043359279633|20190106000001|
| 3168.871365070343|20190106000001|
|1906.3141779899597|20190106000001|
|2741.9848840236664|20190106000001|
|1785.6969730854034|20190106000001|
|2625.4189579486847|20190106000001|
| 2639.019593000412|20190106000001|
| 1950.283979177475|20190106000001|
|1536.5958559513092|20190106000001|
|2808.8331730365753|20190106000001|
| 3216.664701938629|20190106000001|
|1639.5598258972168|20190106000001|
|3391.4076359272003|20190106000001|
|3406.5028550624847|20190106000001|
|2859.0861370563507|20190106000001|
|1537.1859271526337|20190106000001|
|2920.9726588726044|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [7]:
port_scan_udp_heavy_times.show()

+----------------+---------+
|event_time_range|file_name|
+----------------+---------+
+----------------+---------+



In [8]:
port_scan_udp_light_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 2889.072543859482|20190106000001|
| 3136.098750114441|20190106000001|
|2188.1759221553802|20190106000001|
|1874.8825759887695|20190106000001|
+------------------+--------------+



In [9]:
net_scan_tcp_heavy_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
|  71.2768280506134|20190106000001|
| 3589.927623987198|20190106000001|
| 3592.637727022171|20190106000001|
|2538.5068020820618|20190106000001|
|3512.1067910194397|20190106000001|
| 3589.951479911804|20190106000001|
| 3068.463495016098|20190106000001|
|3585.8166489601135|20190106000001|
|3589.6720900535583|20190106000001|
| 2983.540219068527|20190106000001|
| 3583.706946849823|20190106000001|
| 3361.326663017273|20190106000001|
|2728.0358130931854|20190106000001|
|3094.8821699619293|20190106000001|
|1987.9059798717499|20190106000001|
|187.95389890670776|20190106000001|
| 37.19043493270874|20190106000001|
|3547.5861341953278|20190106000001|
|3221.2486419677734|20190106000001|
|2624.8135719299316|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [10]:
net_scan_tcp_light_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 3500.263459920883|20190106000001|
| 3596.805120944977|20190106000001|
|2112.5043399333954|20190106000001|
| 3596.809244155884|20190106000001|
| 469.7812268733978|20190106000001|
|2885.4059369564056|20190106000001|
| 3536.493569135666|20190106000001|
|3519.8709490299225|20190106000001|
|3578.2405819892883|20190106000001|
|229.62878108024597|20190106000001|
|3020.6466159820557|20190106000001|
| 410.7451660633087|20190106000001|
|  3384.04843378067|20190106000001|
|   721.03187084198|20190106000001|
| 3498.526953935623|20190106000001|
| 3325.023730993271|20190106000001|
| 3474.014904022217|20190106000001|
|2988.2877311706543|20190106000001|
|3552.5137569904327|20190106000001|
| 3262.887512922287|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [11]:
net_scan_udp_heavy_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 66.67015600204468|20190106000001|
|3526.7240540981293|20190106000001|
|3044.3941020965576|20190106000001|
|3173.6300139427185|20190106000001|
+------------------+--------------+



In [12]:
net_scan_udp_light_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 2993.124549150467|20190106000001|
| 3392.343747854233|20190106000001|
|2606.8581869602203|20190106000001|
|2274.2927129268646|20190106000001|
| 2554.562464952469|20190106000001|
| 3294.096009016037|20190106000001|
|3172.6657869815826|20190106000001|
|  2244.51220202446|20190106000001|
|2908.9770181179047|20190106000001|
| 2688.988273859024|20190106000001|
| 1251.173772096634|20190106000001|
| 2704.412784099579|20190106000001|
|1792.4399709701538|20190106000001|
| 2874.548471927643|20190106000001|
|  757.408280134201|20190106000001|
| 2217.127156972885|20190106000001|
|3556.0651988983154|20190106000001|
|418.13468074798584|20190106000001|
| 2476.482820034027|20190106000001|
|1965.3743669986725|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [13]:
net_scan_icmp_heavy_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 3167.946217060089|20190106000001|
|179.09645199775696|20190106000001|
+------------------+--------------+



In [14]:
net_scan_icmp_light_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
|2815.7282400131226|20190106000001|
|  2110.17214679718|20190106000001|
| 3276.595118999481|20190106000001|
| 960.4653151035309|20190106000001|
| 2779.487086057663|20190106000001|
| 3308.157867193222|20190106000001|
|3011.1383361816406|20190106000001|
|  3429.30957698822|20190106000001|
|2409.0330572128296|20190106000001|
| 2463.563570022583|20190106000001|
| 2292.224575996399|20190106000001|
| 3106.133035182953|20190106000001|
|3160.9027009010315|20190106000001|
|1712.1145000457764|20190106000001|
| 2653.809695005417|20190106000001|
| 2896.881795167923|20190106000001|
|3091.2760059833527|20190106000001|
|11.611090183258057|20190106000001|
| 814.7871007919312|20190106000001|
|2375.9059801101685|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [15]:
one_flow_tcp_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
| 3516.288110971451|20190106000001|
| 3511.203238964081|20190106000001|
| 3457.195995092392|20190106000001|
| 3039.945447921753|20190106000001|
| 3599.205692052841|20190106000001|
|3457.2062969207764|20190106000001|
|3214.2094078063965|20190106000001|
| 3457.195519924164|20190106000001|
|3376.2172000408173|20190106000001|
| 3457.225975036621|20190106000001|
| 3403.204456090927|20190106000001|
|3457.1991159915924|20190106000001|
|3528.0031819343567|20190106000001|
|3511.2062838077545|20190106000001|
|3497.1824049949646|20190106000001|
|3573.3697719573975|20190106000001|
|  3061.19438290596|20190106000001|
| 3457.198278903961|20190106000001|
| 3189.848515033722|20190106000001|
|3457.2681579589844|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [16]:
one_flow_udp_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
|1065.6567652225494|20190106000001|
| 1395.960561990738|20190106000001|
|3505.6968047618866|20190106000001|
|481.37702679634094|20190106000001|
| 3511.761064052582|20190106000001|
|112.91864204406738|20190106000001|
| 188.5094771385193|20190106000001|
|   625.98566198349|20190106000001|
| 985.4595642089844|20190106000001|
|2511.5937390327454|20190106000001|
|1347.6298308372498|20190106000001|
|1157.3182830810547|20190106000001|
|2474.8983612060547|20190106000001|
| 2866.831038951874|20190106000001|
|3388.0810618400574|20190106000001|
|477.09853196144104|20190106000001|
|1840.6707060337067|20190106000001|
|411.54975986480713|20190106000001|
|3224.9102380275726|20190106000001|
|1446.8420329093933|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [17]:
small_syn_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
|               0.0|20190106000001|
| 2808.662714958191|20190106000001|
|1417.2683730125427|20190106000001|
|1049.9989929199219|20190106000001|
|               0.0|20190106000001|
| 992.8346269130707|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|386.59739804267883|20190106000001|
| 990.1242680549622|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
| 959.0696110725403|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
+------------------+--------------+
only showing top 20 rows



In [18]:
small_udp_times.show()

+--------------------+--------------+
|    event_time_range|     file_name|
+--------------------+--------------+
|  1323.5709879398346|20190106000001|
|  0.5906028747558594|20190106000001|
|0.002569913864135742|20190106000001|
|                 0.0|20190106000001|
|                 0.0|20190106000001|
|                 0.0|20190106000001|
|   47.38193893432617|20190106000001|
|   9.932496070861816|20190106000001|
|  18.002815008163452|20190106000001|
|0.001015186309814...|20190106000001|
|                 0.0|20190106000001|
|                 0.0|20190106000001|
|                 0.0|20190106000001|
|                 0.0|20190106000001|
|                 0.0|20190106000001|
| 0.34627509117126465|20190106000001|
|                 0.0|20190106000001|
|   74.38310813903809|20190106000001|
|  3.4490578174591064|20190106000001|
|                 0.0|20190106000001|
+--------------------+--------------+
only showing top 20 rows



In [19]:
small_ping_times.show()

+------------------+--------------+
|  event_time_range|     file_name|
+------------------+--------------+
|1057.4914350509644|20190106000001|
| 600.0400061607361|20190106000001|
| 1224.664668083191|20190106000001|
|               0.0|20190106000001|
| 182.6839461326599|20190106000001|
| 10.50426697731018|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
| 1618.070030927658|20190106000001|
|               0.0|20190106000001|
| 1850.462163925171|20190106000001|
|               0.0|20190106000001|
| 2464.486990213394|20190106000001|
| 4.996448040008545|20190106000001|
|               0.0|20190106000001|
|               0.0|20190106000001|
| 1107.020674943924|20190106000001|
|               0.0|20190106000001|
|1991.4086718559265|20190106000001|
+------------------+--------------+
only showing top 20 rows

