# 1. Preprocessing

MAWI traces contain packet traces of MAWI traffic captured as a part of research at NII Tokyo at FukudaLab. First, we have to decompress files and divide them to 1s time windows. 
The window size was chosen as a smallest time measure separating anomalies identified by MAWIlab project.

## 1.1 Dataset splitting into time windows
Make sure that editcap is installed on the local machine, if not install it !

    which editcap
    sudo apt-get install wireshark-common

In [2]:
%%bash
# SPLIT
root_dir="/home/.../052016/"
for filename in $root_dir/20*
do
        # get filename from "fullpath filename":
        #https://stackoverflow.com/questions/22727107/how-to-find-the-last-field-using-cut/22727211
        file=$filename | rev | cut -d'/' -f 1 | rev
        # -i X ; X - number of seconds
        editcap -i 1 $filename "$root_dir/splitted_1s/$file"
        echo $file
        #rm $filename
done





## 1.2 Field extraction from the splitted traces
In the same local node, we run the tshark network protocol analyzer to decode the trace content and extract the following basic features: 

* frame.time 
* frame.len 
* ip.proto
* ip.len
* ip.ttl
* ip.version
* ip.flags
* ip.flags.mf
* ip.frag_offset
* ip.src 
* ip.dst
* icmp.type
* icmp.code
* tcp.dstport 
* tcp.srcport 
* tcp.flags
* tcp.flags.ack
* tcp.flags.cwr
* tcp.flags.fin
* tcp.flags.ecn
* tcp.flags.ns
* tcp.flags.push
* tcp.flags.syn
* tcp.flags.urg
* tcp.flags.reset
* tcp.len
* tcp.window_size
* udp.srcport
* udp.dstport


Note:

Make sure tshark is installed on the system!
   
    which tshark
    sudo apt-get install tshark

NOTE:
It is a good idea to run the script with "nohup SCRIPT_NAME &" as it migth take some time

In [2]:
%%bash
root_dir="/home/.../052016"
mkdir -p "$root_dir/splitted_1s/extracted_1/"
for filename in $root_dir/splitted_1s/_*
do
#        SUBSTRING=$(echo $filename | rev | cut -d'/' -f 1 | rev )
        SUBSTRING=$(echo $filename | rev | cut -d'/' -f 1 | rev | cut -d'_' -f 3)
        echo $SUBSTRING
        tshark -T fields -n -r $filename -E aggregator=, -e frame.time -e frame.len -e ip.proto -e ip.len -e ip.ttl -e ip.version -e ip.flags -e ip.flags.mf -e ip.frag_offset -e ip.src -e ip.dst -e icmp.type -e icmp.code -e tcp.dstport -e tcp.srcport -e tcp.flags -e tcp.flags.ack -e tcp.flags.cwr -e tcp.flags.fin -e tcp.flags.ecn -e tcp.flags.ns -e tcp.flags.push -e tcp.flags.syn -e tcp.flags.urg -e tcp.flags.reset -e tcp.len -e tcp.window_size -e udp.srcport -e udp.dstport > $root_dir/splitted_1s/extracted_1s/$SUBSTRING
done

The splits containing these features in human readable format are then uploaded HDFS for further feature extraction in Python SPARK distributed computing environment. Each file is autonomously named by editcap in the format: YYYYMMDDHHmmss.

## 1.3 Copy the extracted fields to hdfs for distributed processing

In [None]:
%%bash
# COPY TO HDFS

hdfs_dir="/user/.../052016/extracted_1s/"
hdfs dfs -mkdir -p $hdfs_dir
for filename in $root_dir/splitted_1min/extracted_1min/*
do
        echo $filename
        hdfs dfs -copyFromLocal $filename $hdfs_dir
        #rm $filename

done

# 2. Feature Extraction

## Table of features we are going to extract from splits:

| Field           | Feature                     | Description                 |
|-----------------|-----------------------------|-----------------------------|
| Tot. volume     | # pkts                      | num. packets                |
|    &nbsp;       | # bytes                     | num. bytes                  |
| PKT size        | pkt_h                       | H(PKT)                      |
|    &nbsp;       | pkt_{min,avg,max,std}       | min/avg/max/std PKT         |
|    &nbsp;       | pkt_p{1,2,5,...,95,97,99}   | percentiles                 |
| IP PKT size     | iplen_h                     | H(IPlen)                    |
|    &nbsp;       | iplen_{min,avg,max,std}     | min/avg/max/std IPlen       |
|    &nbsp;       | iplen_p{1,2,5,...,95,97,99} | percentiles                 |
| IP TTL          | ttl_h                       | H(TTL)                      |
|    &nbsp;       | ttl_{min,avg,max,std}       | min/avg/max/std TTL         |
|    &nbsp;       | ttl_p{1,2,5,...,95,97,99}   | percentiles                 |
| IP Proto        | % icmp/tcp/udp/gre          | share of IP protocols       |
| IPv4/IPv6       | % IPv4/IPv6                 | share of IPv4/IPv6 pkts.    |
|    &nbsp;       | h_IP_src/dst_octet          | H(IP_octets)                |
| TCP PKT size    | tcp_h                       | H(TCP)                      |
|    &nbsp;       | tcp_{min,avg,max,std}       | min/avg/max/std TCP         |
|    &nbsp;       | tcp_p{1,2,5,...,95,97,99}   | percentiles                 |
| TCP WIN size    | win_h                       | H(TCPW)                     |
|    &nbsp;       | win_{min,avg,max,std}       | min/avg/max/std TCPW        |
|    &nbsp;       | win_p{1,2,5,...,95,97,99}   | percentiles                 |
| TCP flags (byte)| % SYN/ACK/PSH/...           | share of TCP flags          |
| TCP/UDP ports   | # well-known                | share of well-known         |
| (src/dst)       | # registered                | share of registered         |
|    &nbsp;       | # dynamic                   | share of dynamic            |
|    &nbsp;       | # remote-access             | share of remote-access      |
|    &nbsp;       | # mail                      | share of mail               |
|    &nbsp;       | # networking                | share of networking         |
|    &nbsp;       | # document-retrieval        | share of document retrieval |
|  IP MF flag +   | # of frag. pkts             | share of of frag. packets   |
| IP frag offset  |       &nbsp;                |          &nbsp;             |
|   ICMP type     | # of icmp type 0            | share of echo reply packets |
|    &nbsp;       | # of icmp type 3            | share of dest. unr. pkts.   |
|    &nbsp;       | # of icmp type 5            | share of redirect mes. pkts.|
|    &nbsp;       | # of icmp type 8            | share of echo request pkts  |
|    &nbsp;       | h_icmp_type                 | H(icmp_type)                |

## Port number explanation in extracted features

remote_access
* tcp
    * 22		ssh				SSH - Secure SHell
    * 23		telnet			Telnet - Insecure remote login - RFC 854
    * 992		telnets			Telnet over SSL
    * 513		rlogin			remote login - RFC 1282

mail
* tcp
    * 25		smtp			SMTP - Simple Mail Transfer Protocol - RFC 2821 (See also RFC 1869)
    * 110		pop3			POP3 - Post Office Protocol version 3 (popular e-mail protocol) - RFC 1939
    * 143		imap			IMAP - Internet Message Access Protocol (A common e-mail protocol)
    * 993		imaps			IMAP over SSL
    * 995		pop3s			POP-3 over SSL
* udp
    * 512		biff			Biff - new mail notification

networking
* tcp
    * 43		whois			Whois - query/response system, usually used for domain name info - RFC 3912
    * 67	
    * 68
    * 137		netbios-ns		NetBIOS - Network Basic Input Output System(Name Service)
    * 138		netbios-dgm		NetBIOS - Network Basic Input Output System(Datagram Service)
    * 139		netbios-ssn		NetBIOS - Network Basic Input Output System(session service)
    * 161		snmp			SNMP - Simple Network Management Protocol - RFC 1157
    * 162		snmp-trap		SNMP - Simple Network Management Protocol(traps) - RFC 1157
    * 179		bgp				BGP - Border Gateway Protocol - RFC 1771
* udp
    * 53		dns				DNS - Domain Name System - RFC 1035
    * 67		dhcp			DHCP - Dynamic Host Configuration Protocol - RFC 1541
    * 68
    * 137		netbios-ns		NetBIOS - Network Basic Input Output System(Name Service)
    * 138		netbios-dgm		NetBIOS - Network Basic Input Output System(Datagram Service)
    * 139		netbios-ssn		NetBIOS - Network Basic Input Output System(session service)
    * 161		snmp			SNMP - Simple Network Management Protocol - RFC 1157
    * 162		snmp-trap		SNMP - Simple Network Management Protocol(traps) - RFC 1157

document_retrieval
* tcp
    * 20		ftp-data		FTP - File Transfer Protocol(data) - RFC 959
    * 21		ftp				FTP - File Transfer Protocol(control) - RFC 959
    * 70		gopher			Gopher - A precursor to HTTP - RFC 1436
    * 80		http			HTTP - HyperText Transfer Protocol - RFC 2616
    * 443		https           HTTP over TLS/SSL
    * 445		smb				Samba/SMB - Server Message Block - Microsoft Windows filesharing
    * 540		uucp			UUCP - Unix to Unix Copy
    * 989		ftps-data		FTP over SSL(data)
    * 990		ftps			FTP over SSL(control)
* udp
    * 69		tftp			TFTP - Trivial File Transfer Protocol - used for bootstrapping - RFC 1350
    * 445		smb				Samba/SMB - Server Message Block - Microsoft Windows filesharing

## 2.1 Session initialization

In [1]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = "/home/big-dama/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/big-dama/anaconda3/bin/python"

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = "pyspark-shell"


import pyspark
from pyspark import SparkConf, SparkContext
from pyspark import sql
from pyspark.sql import Row
from pyspark.sql import SQLContext, HiveContext, DataFrameWriter
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, sum
from pyspark.sql import SparkSession
from datetime import datetime
import subprocess
import math
import time
from pyspark.sql import functions as F
from operator import add
from pyspark.sql.functions import pandas_udf, PandasUDFType
import numpy as np
from scipy.stats import entropy
import pandas as pd
from pyspark.sql.functions import lit
from pyspark import SparkConf
from pyspark import SparkContext

# described here https://medium.com/@achilleus/spark-session-10d0d66d1d24
print("starting")
spark = SparkSession.builder\
                    .appName("darknet_feature_extraction_without_hive")\
                    .master("yarn")\
                    .config("spark.submit.deployMode","client")\
                    .config("spark.executor.memory","20g")\
                    .config("spark.driver.memory","20g")\
                    .config('spark.sql.autoBroadcastJoinThreshold','-1')\
                    .enableHiveSupport()\
                    .getOrCreate()

print("done with startup")

starting
done with startup


In [2]:
hdfs_dir = '/user/big-dama/pavol/mawi_data_052016/extracted_mawi_052016/'
output_loc = '/user/big-dama/pavol/mawi_data_052016/extracted_mawi_052016_features/'

file_dir = subprocess.run(["hdfs", "dfs", "-ls", hdfs_dir], stdout=subprocess.PIPE)
file_dir = str(file_dir.stdout)
file_dir = file_dir.split('\\n')

file_full_path_list = []

i = 0
for file_name in file_dir:
    if ((i != 0) and ("/" in file_name)):
        splits = file_name.split(" ")
        file_full_path_list.append(splits[-1].lstrip('\n').strip())
    i = i+1

output_loc_ls = subprocess.run(["hdfs", "dfs", "-ls", output_loc+'/'], stdout=subprocess.PIPE)

if not output_loc_ls.stdout:
    processed_files=list()
else:
    processed_files_df = spark.read.format("csv").option("header", "false").option("delimiter", "\t").load(output_loc+"/*")
    processed_files = [str(row._c0) for row in processed_files_df.collect()]

files_to_process = list(set(file_full_path_list) - set(processed_files))

schema = StructType([StructField('timestamp',StringType(),True)\
                     , StructField('frame_len',DoubleType(),True)\
                     , StructField('ip_proto',DoubleType(),True)\
                     , StructField('ip_len',DoubleType(),True)\
                     , StructField('ip_ttl',DoubleType(),True)\
                     , StructField('ip_version',DoubleType(),True)\
                     , StructField('ip_flags',StringType(),True)\
                     , StructField('ip_flags_mf',IntegerType(),True)\
                     , StructField('ip_frag_offset',IntegerType(),True)\
                     , StructField('ip_src',StringType(),True)\
                     , StructField('ip_dst',StringType(),True)\
                     , StructField('icmp_type',DoubleType(),True)\
                     , StructField('icmp_code',StringType(),True)\
                     , StructField('tcp_dstport',IntegerType(),True)\
                     , StructField('tcp_srcport',IntegerType(),True)\
                     , StructField('tcp_flags',StringType(),True)\
                     , StructField('tcp_flags_ack',DoubleType(),True)\
                     , StructField('tcp_flags_cwr',DoubleType(),True)\
                     , StructField('tcp_flags_fin',DoubleType(),True)\
                     , StructField('tcp_flags_ecn',DoubleType(),True)\
                     , StructField('tcp_flags_ns',DoubleType(),True)\
                     , StructField('tcp_flags_push',DoubleType(),True)\
                     , StructField('tcp_flags_syn',DoubleType(),True)\
                     , StructField('tcp_flags_urg',DoubleType(),True)\
                     , StructField('tcp_flags_reset',DoubleType(),True)\
                     , StructField('tcp_len',DoubleType(),True)\
                     , StructField('tcp_winsize',DoubleType(),True)\
                     , StructField('udp_srcport',IntegerType(),True)\
                     , StructField('udp_dstport',IntegerType(),True)])

In [5]:
@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def num_tcp_appport_cat(labels):
    well_known = np.linspace(0,1023,1024)
    registered_ports = np.linspace(1024,49151,48128)
    dynamic_ports = np.linspace(49152,65535,16384)

    remote_access_tcp = [22,23,992,513]
    mail_tcp = [25,110,143,993,995]
    networking_tcp = [43,67,68,137,138,139,161,162,179]
    document_retrieval_tcp = [20,21,70,80,443,445,540,989,990]


    # filter port number types that are as index in values_counts result
    list_of_ports_in_table = labels.value_counts(normalize=False).index.tolist()
    # port ranges
    if any(i in well_known for i in list_of_ports_in_table) :
        sum_well_known = np.sum(labels.value_counts(normalize=False).loc[list(well_known)].dropna().values)
    else:
        sum_well_known = float(0)

    if any(i in registered_ports for i in list_of_ports_in_table) :
        sum_registered_ports = np.sum(labels.value_counts(normalize=False).loc[list(registered_ports)].dropna().values)
    else:
        sum_registered_ports = float(0)

    if any(i in dynamic_ports for i in list_of_ports_in_table) :
        sum_dynamic_ports = np.sum(labels.value_counts(normalize=False).loc[list(dynamic_ports)].dropna().values)
    else:
        sum_dynamic_ports = float(0)
    # service categories
    if any(i in remote_access_tcp for i in list_of_ports_in_table) :
        sum_remote_access_tcp = np.sum(labels.value_counts(normalize=False).loc[list(remote_access_tcp)].dropna().values)
    else:
        sum_remote_access_tcp = float(0)

    if any(i in mail_tcp for i in list_of_ports_in_table) :
        sum_mail_tcp = np.sum(labels.value_counts(normalize=False).loc[list(mail_tcp)].dropna().values)
    else:
        sum_mail_tcp = float(0)

    if any(i in networking_tcp for i in list_of_ports_in_table) :
        sum_networking_tcp = np.sum(labels.value_counts(normalize=False).loc[list(networking_tcp)].dropna().values)
    else:
        sum_networking_tcp = float(0)

    if any(i in document_retrieval_tcp for i in list_of_ports_in_table) :
        sum_document_retrieval_tcp = np.sum(labels.value_counts(normalize=False).loc[list(document_retrieval_tcp)].dropna().values)
    else:
        sum_document_retrieval_tcp = float(0)

    res = list([sum_well_known, sum_registered_ports, sum_dynamic_ports, sum_remote_access_tcp, sum_mail_tcp, sum_networking_tcp, sum_document_retrieval_tcp])
    return str(res)

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def num_udp_appport_cat(labels):
    well_known = np.linspace(0,1023,1024)
    registered_ports = np.linspace(1024,49151,48128)
    dynamic_ports = np.linspace(49152,65535,16384)

    mail_udp = [512]
    networking_udp = [53,67,68,137,138,139,161,162]
    document_retrieval_udp = [69,445]

    # filter port number types that are as index in values_counts result
    list_of_ports_in_table = labels.value_counts(normalize=False).index.tolist()
    # port ranges
    if any(i in well_known for i in list_of_ports_in_table) :
        sum_well_known = np.sum(labels.value_counts(normalize=False).loc[list(well_known)].dropna().values)
    else:
        sum_well_known = float(0)

    if any(i in registered_ports for i in list_of_ports_in_table) :
        sum_registered_ports = np.sum(labels.value_counts(normalize=False).loc[list(registered_ports)].dropna().values)
    else:
        sum_registered_ports = float(0)

    if any(i in dynamic_ports for i in list_of_ports_in_table) :
        sum_dynamic_ports = np.sum(labels.value_counts(normalize=False).loc[list(dynamic_ports)].dropna().values)
    else:
        sum_dynamic_ports = float(0)
    # service categories

    if any(i in mail_udp for i in list_of_ports_in_table) :
        sum_mail_udp = np.sum(labels.value_counts(normalize=False).loc[list(mail_udp)].dropna().values)
    else:
        sum_mail_udp = float(0)

    if any(i in networking_udp for i in list_of_ports_in_table) :
        sum_networking_udp = np.sum(labels.value_counts(normalize=False).loc[list(networking_udp)].dropna().values)
    else:
        sum_networking_udp = float(0)

    if any(i in document_retrieval_udp for i in list_of_ports_in_table) :
        sum_document_retrieval_udp = np.sum(labels.value_counts(normalize=False).loc[list(document_retrieval_udp)].dropna().values)
    else:
        sum_document_retrieval_udp = float(0)

    res = list([sum_well_known, sum_registered_ports, sum_dynamic_ports, sum_mail_udp, sum_networking_udp, sum_document_retrieval_udp])
    return str(res)

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def frac_tcp_appport_cat(labels):
    well_known = np.linspace(0,1023,1024)
    registered_ports = np.linspace(1024,49151,48128)
    dynamic_ports = np.linspace(49152,65535,16384)

    remote_access_tcp = [22,23,992,513]
    mail_tcp = [25,110,143,993,995]
    networking_tcp = [43,67,68,137,138,139,161,162,179]
    document_retrieval_tcp = [20,21,70,80,443,445,540,989,990]

    # filter port number types that are as index in values_counts result
    list_of_ports_in_table = labels.value_counts(normalize=True).index.tolist()
    # port ranges
    if any(i in well_known for i in list_of_ports_in_table) :
        sum_well_known = np.sum(labels.value_counts(normalize=True).loc[list(well_known)].dropna().values)
    else:
        sum_well_known = float(0)

    if any(i in registered_ports for i in list_of_ports_in_table) :
        sum_registered_ports = np.sum(labels.value_counts(normalize=True).loc[list(registered_ports)].dropna().values)
    else:
        sum_registered_ports = float(0)

    if any(i in dynamic_ports for i in list_of_ports_in_table) :
        sum_dynamic_ports = np.sum(labels.value_counts(normalize=True).loc[list(dynamic_ports)].dropna().values)
    else:
        sum_dynamic_ports = float(0)
    # service categories
    if any(i in remote_access_tcp for i in list_of_ports_in_table) :
        sum_remote_access_tcp = np.sum(labels.value_counts(normalize=True).loc[list(remote_access_tcp)].dropna().values)
    else:
        sum_remote_access_tcp = float(0)

    if any(i in mail_tcp for i in list_of_ports_in_table) :
        sum_mail_tcp = np.sum(labels.value_counts(normalize=True).loc[list(mail_tcp)].dropna().values)
    else:
        sum_mail_tcp = float(0)

    if any(i in networking_tcp for i in list_of_ports_in_table) :
        sum_networking_tcp = np.sum(labels.value_counts(normalize=True).loc[list(networking_tcp)].dropna().values)
    else:
        sum_networking_tcp = float(0)

    if any(i in document_retrieval_tcp for i in list_of_ports_in_table) :
        sum_document_retrieval_tcp = np.sum(labels.value_counts(normalize=True).loc[list(document_retrieval_tcp)].dropna().values)
    else:
        sum_document_retrieval_tcp = float(0)

    res = list([sum_well_known, sum_registered_ports, sum_dynamic_ports, sum_remote_access_tcp, sum_mail_tcp, sum_networking_tcp, sum_document_retrieval_tcp])
    return str(res)

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def frac_udp_appport_cat(labels):
    well_known = np.linspace(0,1023,1024)
    registered_ports = np.linspace(1024,49151,48128)
    dynamic_ports = np.linspace(49152,65535,16384)

    mail_udp = [512]
    networking_udp = [53,67,68,137,138,139,161,162]
    document_retrieval_udp = [69,445]

    # filter port number types that are as index in values_counts result
    list_of_ports_in_table = labels.value_counts(normalize=True).index.tolist()
    # port ranges
    if any(i in well_known for i in list_of_ports_in_table) :
        sum_well_known = np.sum(labels.value_counts(normalize=True).loc[list(well_known)].dropna().values)
    else:
        sum_well_known = float(0)

    if any(i in registered_ports for i in list_of_ports_in_table) :
        sum_registered_ports = np.sum(labels.value_counts(normalize=True).loc[list(registered_ports)].dropna().values)
    else:
        sum_registered_ports = float(0)

    if any(i in dynamic_ports for i in list_of_ports_in_table) :
        sum_dynamic_ports = np.sum(labels.value_counts(normalize=True).loc[list(dynamic_ports)].dropna().values)
    else:
        sum_dynamic_ports = float(0)
    # service categories

    if any(i in mail_udp for i in list_of_ports_in_table) :
        sum_mail_udp = np.sum(labels.value_counts(normalize=True).loc[list(mail_udp)].dropna().values)
    else:
        sum_mail_udp = float(0)

    if any(i in networking_udp for i in list_of_ports_in_table) :
        sum_networking_udp = np.sum(labels.value_counts(normalize=True).loc[list(networking_udp)].dropna().values)
    else:
        sum_networking_udp = float(0)

    if any(i in document_retrieval_udp for i in list_of_ports_in_table) :
        sum_document_retrieval_udp = np.sum(labels.value_counts(normalize=True).loc[list(document_retrieval_udp)].dropna().values)
    else:
        sum_document_retrieval_udp = float(0)

    res = list([sum_well_known, sum_registered_ports, sum_dynamic_ports, sum_mail_udp, sum_networking_udp, sum_document_retrieval_udp])
    return str(res)

# safest solution to compute entropy from https://stackoverflow.com/questions/15450192/fastest-way-to-compute-entropy-in-python
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def entropy_udf(labels):
    value,counts = np.unique(labels.dropna(), return_counts=True)
    return entropy(counts)

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def frac_ipv4(labels):
    res = labels.value_counts(normalize=True)
    if float(4) in res.index:
        return res.loc[float(4)]
    else:
        return float(0)

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def frac_ipv6(labels):
    res = labels.value_counts(normalize=True)
    if float(6) in res.index:
        return res.loc[float(6)]
    else:
        return float(0)

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def num_icmp_tcp_udp_gre(labels):
    res = labels.value_counts(normalize=False)
    if float(1) in res.index:
        icmp = str(res.loc[float(1)])
    else:
        icmp = str(0)
    if float(6) in res.index:
        tcp = str(res.loc[float(6)])
    else:
        tcp = str(0)
    if float(17) in res.index:
        udp = str(res.loc[float(17)])
    else:
        udp = str(0)
    if float(47) in res.index:
        gre = str(res.loc[float(47)])
    else:
        gre = str(0)
    return "["+icmp+","+tcp+","+udp+","+gre+"]"


@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def frac_icmp_tcp_udp_gre(labels):
    res = labels.value_counts(normalize=True)
    if float(1) in res.index:
        icmp = str(res.loc[float(1)])
    else:
        icmp = str(0)
    if float(6) in res.index:
        tcp = str(res.loc[float(6)])
    else:
        tcp = str(0)
    if float(17) in res.index:
        udp = str(res.loc[float(17)])
    else:
        udp = str(0)
    if float(47) in res.index:
        gre = str(res.loc[float(47)])
    else:
        gre = str(0)
    return "["+icmp+","+tcp+","+udp+","+gre+"]"

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def num_echo_reply_destination_unreachable_redirect_message_echo_request(labels):
    res = labels.value_counts(normalize=False)
    if float(0) in res.index:
        echo_reply = str(res.loc[float(0)])
    else:
        echo_reply = str(0)
    if float(3) in res.index:
        destination_unreachable = str(res.loc[float(3)])
    else:
        destination_unreachable = str(0)
    if float(5) in res.index:
        redirect_message = str(res.loc[float(5)])
    else:
        redirect_message = str(0)
    if float(8) in res.index:
        echo_request = str(res.loc[float(8)])
    else:
        echo_request = str(0)
    return "["+echo_reply+","+destination_unreachable+","+redirect_message+","+echo_request+"]"

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def frac_echo_reply_destination_unreachable_redirect_message_echo_request(labels):
    res = labels.value_counts(normalize=True)
    if float(0) in res.index:
        echo_reply = str(res.loc[float(0)])
    else:
        echo_reply = str(0)
    if float(3) in res.index:
        destination_unreachable = str(res.loc[float(3)])
    else:
        destination_unreachable = str(0)
    if float(5) in res.index:
        redirect_message = str(res.loc[float(5)])
    else:
        redirect_message = str(0)
    if float(8) in res.index:
        echo_request = str(res.loc[float(8)])
    else:
        echo_request = str(0)
    return "["+echo_reply+","+destination_unreachable+","+redirect_message+","+echo_request+"]"

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def frac_tcpflag(labels):
    res = labels.value_counts(normalize=True)
    if float(1) in res.index:
        return res.loc[float(1)]
    else:
        return float(0)

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def num_tcpflag(labels):
    res = labels.value_counts(normalize=False)
    if float(1) in res.index:
        return res.loc[float(1)]
    else:
        return float(0)

# list of percentiles we wil compute and corresponfing name of the column
percentiles = [0.01, 0.02, 0.05, 0.10, 0.15, 0.20, 0.25, 0.50, 0.75, 0.90, 0.95, 0.97, 0.99]
percentiles_col_name = "percentiles(1_2_5_10_15_20_25_50_75_90_95_97_99_"


for file_name in files_to_process:
    try:
        datadf = spark.read.format("csv").option("header", "false").schema(schema).option("delimiter", "\t").load(file_name)

        # initial creation of feature_subset row which will be written to hive table after all features are calculated
        features_subset = datadf.select(F.sum("frame_len").alias("vol(frame_len)"), F.count("frame_len").alias("num_pkts(frame_len)"), F.min("frame_len"), F.max("frame_len"), F.avg("frame_len"), F.variance("frame_len").alias("var(frame_len)"), F.stddev("frame_len").alias("stddev(frame_len)") ).withColumn("file_name", lit(file_name))
        # about percentiles:
        # https://support.treasuredata.com/hc/en-us/articles/360001457367-Hive-Built-in-Aggregate-Functions
        datadf_percentiles = datadf.approxQuantile("frame_len",percentiles, 0)
        features_subset = features_subset.withColumn(percentiles_col_name+"frame_len)", lit(str(datadf_percentiles)))
        entropy_result = datadf.agg(entropy_udf("frame_len").alias("entropy(frame_len)")).withColumn("file_name", lit(file_name))
        features_subset = features_subset.join(entropy_result,"file_name")

        for name in ["ip_len", "ip_ttl", "tcp_len","tcp_winsize"]:
            features_subset = features_subset.join(datadf.select(F.min(name), F.max(name), F.avg(name), F.variance(name).alias("var("+name+")"), F.stddev(name).alias("stddev("+name+")")).withColumn("file_name", lit(file_name)),"file_name")
            datadf_percentiles = datadf.approxQuantile(name,percentiles, 0)
            features_subset = features_subset.withColumn(percentiles_col_name+"("+name+")", lit(str(datadf_percentiles)))
            entropy_result = datadf.agg(entropy_udf(name).alias("entropy("+name+")")).withColumn("file_name", lit(file_name))
            features_subset = features_subset.join(entropy_result,"file_name")

        name = "tcp_flags"
        entropy_result = datadf.agg(entropy_udf(name).alias("entropy("+name+")")).withColumn("file_name", lit(file_name))
        features_subset = features_subset.join(entropy_result,"file_name")

        name = "ip_version"
        ipv4_frac_result = datadf.agg(frac_ipv4(name).alias("frac_ipv4")).withColumn("file_name", lit(file_name))
        ipv6_frac_result = datadf.agg(frac_ipv6(name).alias("frac_ipv6")).withColumn("file_name", lit(file_name))
        features_subset = features_subset.join(ipv4_frac_result,"file_name")
        features_subset = features_subset.join(ipv6_frac_result,"file_name")

        name = "ip_proto"
        frac_icmp_tcp_udp_gre_res = datadf.agg(frac_icmp_tcp_udp_gre(name).alias("frac_icmp_tcp_udp_gre("+name+")")).withColumn("file_name", lit(file_name))
        features_subset = features_subset.join(frac_icmp_tcp_udp_gre_res,"file_name")
        num_icmp_tcp_udp_gre_res = datadf.agg(num_icmp_tcp_udp_gre(name).alias("num_icmp_tcp_udp_gre("+name+")")).withColumn("file_name", lit(file_name))
        features_subset = features_subset.join(num_icmp_tcp_udp_gre_res,"file_name")

        # TCP/UDP port categories

        for name in ["tcp_dstport","tcp_srcport"] :
            num_tcp_appport_categories = datadf.agg(num_tcp_appport_cat(name).alias("num_appport_ranges("+name+")")).withColumn("file_name", lit(file_name))
            frac_tcp_appport_categories = datadf.agg(frac_tcp_appport_cat(name).alias("frac_appport_ranges("+name+")")).withColumn("file_name", lit(file_name))
            features_subset = features_subset.join(num_tcp_appport_categories,"file_name")
            features_subset = features_subset.join(frac_tcp_appport_categories,"file_name")

        for name in ['udp_dstport','udp_srcport']:
            num_udp_appport_categories = datadf.agg(num_udp_appport_cat(name).alias("num_appport_ranges("+name+")")).withColumn("file_name", lit(file_name))
            frac_udp_appport_categories = datadf.agg(frac_udp_appport_cat(name).alias("frac_appport_ranges("+name+")")).withColumn("file_name", lit(file_name))
            features_subset = features_subset.join(num_udp_appport_categories,"file_name")
            features_subset = features_subset.join(frac_udp_appport_categories,"file_name")

        # IP address entropies
        for name in ['ip_dst','ip_src']:
            octets = F.split(datadf[name], '\.')
            datadf = datadf.withColumn(name + '_1st_octet', octets.getItem(0))
            datadf = datadf.withColumn(name + '_2nd_octet', octets.getItem(1))
            datadf = datadf.withColumn(name + '_3rd_octet', octets.getItem(2))
            datadf = datadf.withColumn(name + '_4th_octet', octets.getItem(3))
            entropy_1st_IP_octet = datadf.agg(entropy_udf(col(name + '_1st_octet')).alias("entropy_1st_IP_octet("+name+")")).withColumn("file_name", lit(file_name))
            entropy_2nd_IP_octet = datadf.agg(entropy_udf(col(name + '_2nd_octet')).alias("entropy_2nd_IP_octet("+name+")")).withColumn("file_name", lit(file_name))
            entropy_3rd_IP_octet = datadf.agg(entropy_udf(col(name + '_3rd_octet')).alias("entropy_3rd_IP_octet("+name+")")).withColumn("file_name", lit(file_name))
            entropy_4th_IP_octet = datadf.agg(entropy_udf(col(name + '_4th_octet')).alias("entropy_4th_IP_octet("+name+")")).withColumn("file_name", lit(file_name))
            features_subset = features_subset.join(entropy_1st_IP_octet,"file_name")
            features_subset = features_subset.join(entropy_2nd_IP_octet,"file_name")
            features_subset = features_subset.join(entropy_3rd_IP_octet,"file_name")
            features_subset = features_subset.join(entropy_4th_IP_octet,"file_name")

        for name in ["tcp_flags_ack", "tcp_flags_cwr", "tcp_flags_fin", "tcp_flags_ecn", "tcp_flags_ns", "tcp_flags_push", "tcp_flags_syn", "tcp_flags_urg","tcp_flags_reset"]:
            frac_tcpflag_result = datadf.agg(frac_tcpflag(name).alias("frac_tcpflag("+name+")")).withColumn("file_name", lit(file_name))
            num_tcpflag_result = datadf.agg(num_tcpflag(name).alias("num_tcpflag("+name+")")).withColumn("file_name", lit(file_name))
            features_subset = features_subset.join(frac_tcpflag_result,"file_name")
            features_subset = features_subset.join(num_tcpflag_result,"file_name")

        # Fragmentation has occured when either the more fragment bit is set or the fragmentation offset is greater than zero. The filter tp display both types would look like:
        # ip.flags.mf ==1 or ip.frag_offset gt 0
        # https://osqa-ask.wireshark.org/questions/41152/how-to-check-if-fragmentation-is-happening
        datadf = datadf.withColumn('fragmentation', datadf.ip_flags_mf + datadf.ip_frag_offset)
        datadf = datadf.withColumn('fragmentation(binary)', F.when(F.col('fragmentation') > 0, 1).otherwise(0))
        name = 'fragmentation(binary)'
        frac_frag_result = datadf.agg(frac_tcpflag(name).alias("frac_frag")).withColumn("file_name", lit(file_name))
        num_frag_result = datadf.agg(num_tcpflag(name).alias("num_frag")).withColumn("file_name", lit(file_name))
        features_subset = features_subset.join(frac_frag_result,"file_name")
        features_subset = features_subset.join(num_frag_result,"file_name")

        name = "icmp_type"
        icmp_type_entropy_result = datadf.agg(entropy_udf(name).alias("entropy("+name+")")).withColumn("file_name", lit(file_name))
        frac_echo_reply_destination_unreachable_redirect_message_echo_request_result = datadf.agg(frac_echo_reply_destination_unreachable_redirect_message_echo_request(name).alias("frac_echo_reply_destination_unreachable_redirect_message_echo_request("+name+")")).withColumn("file_name", lit(file_name))
        num_echo_reply_destination_unreachable_redirect_message_echo_request_result = datadf.agg(num_echo_reply_destination_unreachable_redirect_message_echo_request(name).alias("num_echo_reply_destination_unreachable_redirect_message_echo_request("+name+")")).withColumn("file_name", lit(file_name))

        features_subset = features_subset.join(icmp_type_entropy_result,"file_name")
        features_subset = features_subset.join(frac_echo_reply_destination_unreachable_redirect_message_echo_request_result,"file_name")
        features_subset = features_subset.join(num_echo_reply_destination_unreachable_redirect_message_echo_request_result,"file_name")        
        try:
            features_subset.show()
            #features_subset.write.save(path=output_loc, format='csv', mode='append', sep='\t')
        except:
            print(file_name + " not processed")
    except:
        print(file_name + " not processed")

+--------------------+--------------+-------------------+--------------+--------------+------------------+------------------+-----------------+----------------------------------------------------------+------------------+-----------+-----------+-----------------+----------------+-----------------+--------------------------------------------------------+-----------------+-----------+-----------+-----------------+-----------------+-----------------+--------------------------------------------------------+------------------+------------+------------+-----------------+------------------+-----------------+---------------------------------------------------------+------------------+----------------+----------------+------------------+-------------------+-------------------+-------------------------------------------------------------+--------------------+------------------+------------------+-------------------+-------------------------------+------------------------------+------------------

#  Appendix - Feature extraction script running - possible java issues
Java memory runs out sometimes on the servers during the processing of tables. Most probably this is due to fact, that Spark is not designed to process large number of files in loop, but rather in bulk - Big data :).
So the script crashes occasionally.
Workaround is to run it in loop (if it unexpectedly freezes) and additionally restart it if it crashes with JAVA error.

In [None]:
# script to run the feature extraction in loop, called "neverending_script.sh"
# run it as "nohup neverending_script.sh &> neverending_script_nohup.out&" so it runs in background and the stdout goes to file
%bash
#!/bin/bash
while true
do
        nohup ./mawi_feature_extraction.py &
        wait
        sleep 1
done

In [None]:
# script to kill/restart the neverending_script.sh, called "neverending_script_restarter.sh"
# it checks neverending_script_nohup.out to see if there are JAVA memory issues
# run it as "nohup neverending_script_restarter.sh &> neverending_script_restarter_nohup.out&"
%bash
#!/bin/bash

# continuosly check the last line in neverwnding script output for jabva out of memory pattern
# if it finds anything ($? true value of previous output, 0=true, 1=false then it will kill the process which will restart the process in the "neverendingscript.py

tail -fn0 neverending_script_nohup.out | \
while read line ; do
        echo "$line" | grep "java.lang.OutOfMemoryError"
        if [ $? = 0 ]
        then
                pkill -f /.../mawi_feature_extraction.py
                echo "mawi_feature_extraction.py killed"
        fi
done