In [1]:
import sys
import math
import datetime
import ipaddress
import dateutil.relativedelta
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime as dt

import multiprocessing as mp
import pandas as pd
import numpy as np
from scipy import stats
from scipy.sparse import csr_matrix
import matplotlib.pyplot as plt
import glob

# from ipwhois import IPWhois
# import seaborn as sns
# sys.path.insert(1, '../')


In [2]:
# Public variables
FIVE_TUPLE = ["src_ip","dst_ip","src_port","dst_port","protocol"]
COLOR_LIST = ["red","blue","orange","green","purple","yellow","pink","black"]

In [3]:
# Output directory
WH_DIRECTORY = "/media/disk1/mengying/"

# Window size
WINDOW_SIZE_IN_NANOSEC = 1 * (10**9)
TARGET_TIME_IN_NANOSEC = 60 * (10**9)

# Home netmask
HOME_NET = ["128.112.0.0/16","140.180.0.0/16","204.153.48.0/23","66.180.176.0/24","66.180.177.0/24","66.180.180.0/22"]

In [4]:
# Spark configuration
conf = pyspark.SparkConf().setAll([('spark.cores.max', '100'), ('spark.driver.memory','150g'), 
                                   ("spark.default.parallelism", '100'), 
                                   ("spark.speculation","false"), 
                                   ("spark.sql.warehouse.dir", WH_DIRECTORY), 
                                   ("spark.driver.maxResultSize", "80g") ])

spark = SparkSession.builder.appName('nap_analysis').config(conf=conf).getOrCreate()
sc = spark.sparkContext

23/09/17 05:20:13 WARN Utils: Your hostname, jrex-dell-r6525-002e3u19 resolves to a loopback address: 127.0.1.1; using 172.17.0.83 instead (on interface eno1)
23/09/17 05:20:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/17 05:20:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/17 05:20:14 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/09/17 05:20:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/17 05:20:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
# Print functions
def prRed(skk): print("\033[91m {}\033[00m" .format(skk))
def prGreen(skk): print("\033[92m {}\033[00m" .format(skk))
def prYellow(skk): print("\033[93m {}\033[00m" .format(skk))

In [6]:
# Preprocess HOME_NET to identify external IPs later
internal_ips = []
for item in HOME_NET:
    num_bits = 32 - int(item.split("/")[1])
    internal_ips.append((int(ipaddress.IPv4Address(item.split("/")[0])), (2**32-1) >> num_bits << num_bits))

In [7]:
import re

def is_internal_func(ip, broadcast_dict):
    internal_ips = broadcast_dict.value
    for item in internal_ips:
        if item[1] & int(ip) == item[0]:
            return True
    return False

def mac_to_int_func(mac):
    return int(re.sub('[:\.\-\s]', '', mac), 16)
    
    
broadcast_dict = sc.broadcast(internal_ips)

# ip_to_int converts IP from string form to int form
ip_to_int = F.udf(lambda ip: int(ipaddress.IPv4Address(ip)), LongType())

# is_internal returns True if ip is internal and False otherwise
is_internal = F.udf(lambda ip : is_internal_func(ip, broadcast_dict), BooleanType())

# ip_to_int converts mac from string form to int form
mac_to_int = F.udf(lambda mac: mac_to_int_func(mac), LongType())


In [8]:
sys.path.insert(1, '../scripts')
from global_variables import all_header_titles


def preprocess_df(filename):
    start = dt.now()
    in_path = WH_DIRECTORY + filename
    df = spark.read.csv(in_path, inferSchema=True, header=False, sep="\\t")
    
    df.createOrReplaceTempView("tshark")
    header_fields = []
    for counter, item in enumerate(all_header_titles):
        header_fields.append(" {field} AS {title} ".format(field=df.schema[counter].name, title=item))
    select_q = "SELECT {fields} FROM tshark".format(fields=', '.join(header_fields))
    df = spark.sql(select_q)

    df = df.withColumn("report", mac_to_int(F.col("src_mac"))).\
            withColumn("mac_ts", mac_to_int(F.col("dst_mac"))).\
            drop("src_mac", "dst_mac").\
            where(F.col("mac_ts").isNotNull())
    df.cache()
    
    min_ts = df.agg(F.min(F.col("mac_ts")).alias("min_ts")).collect()[0]['min_ts']
    max_ts = df.agg(F.max(F.col("mac_ts")).alias("max_ts")).collect()[0]['max_ts']
    df = df.withColumn('window',((F.col("mac_ts")-min_ts)/WINDOW_SIZE_IN_NANOSEC).cast(IntegerType()))

    return df, min_ts, max_ts

In [9]:
df, min_ts, max_ts = preprocess_df("mnt/anonflow/catql/experiment_europ4_2023/received_csvs_3/*.csv")
df.printSchema()


23/09/17 05:23:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

root
 |-- version: integer (nullable = true)
 |-- ip_hdr_len: integer (nullable = true)
 |-- ip_dsfield: string (nullable = true)
 |-- tos_precedence: string (nullable = true)
 |-- tos_delay: string (nullable = true)
 |-- tos_throughput: string (nullable = true)
 |-- tos_reliability: string (nullable = true)
 |-- tos_cost: string (nullable = true)
 |-- ip_dsfield_dscp: integer (nullable = true)
 |-- ip_dsfield_ecn: integer (nullable = true)
 |-- ip_len: integer (nullable = true)
 |-- ip_id: string (nullable = true)
 |-- flags_rb: integer (nullable = true)
 |-- flags_sf: string (nullable = true)
 |-- flags_df: integer (nullable = true)
 |-- flags_mf: integer (nullable = true)
 |-- frag_offset: integer (nullable = true)
 |-- ip_fragment: string (nullable = true)
 |-- fragment_count: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- src_ip: string (nullable = true)
 |-- dst_ip: string (nullable = true)
 |-- ip_src_rt: string (nullab

In [10]:

min_ts_dt = datetime.datetime.fromtimestamp(min_ts/(10**9))
max_ts_dt = datetime.datetime.fromtimestamp(max_ts/(10**9))
diff = dateutil.relativedelta.relativedelta(max_ts_dt, min_ts_dt)
prGreen("The duration of the packet capture: %d days, %d hours, %d minutes and %d seconds" 
        %(diff.days, diff.hours, diff.minutes, diff.seconds))

[92m The duration of the packet capture: 0 days, 0 hours, 9 minutes and 35 seconds[00m


In [11]:
df = df.select(FIVE_TUPLE + ["report", "mac_ts", "window"]).\
        withColumn("source_ip_int", ip_to_int(F.col("src_ip"))).\
        withColumn("from_internal", is_internal(F.col("source_ip_int"))).\
        withColumn("external_ip", F.when(F.col("from_internal"), F.col("dst_ip")).otherwise(F.col("src_ip"))).\
        withColumn("internal_ip", F.when(F.col("from_internal"), F.col("src_ip")).otherwise(F.col("dst_ip"))).\
        drop("source_ip_int")

In [13]:
df.where(F.col("from_internal")).\
        withColumn("win", F.floor(F.col("window")/60)).\
        groupBy("win").\
        agg(F.countDistinct("external_ip", "internal_ip").alias("num_insertions")).\
        orderBy("win").\
        show(100)




+---+--------------+
|win|num_insertions|
+---+--------------+
|  0|         57060|
|  1|         39782|
|  2|         32585|
|  3|         38988|
|  4|         34108|
|  5|         38759|
|  6|         31759|
|  7|         36907|
|  8|         32178|
|  9|         30309|
+---+--------------+



                                                                                

In [12]:
df.select("window").groupBy().agg(F.max("window"), F.min("window")).show()
df.groupBy("report").agg(F.sum(F.lit(1)).alias("count")).show()

                                                                                

+-----------+-----------+
|max(window)|min(window)|
+-----------+-----------+
|        575|          0|
+-----------+-----------+





+------+---------+
|report|    count|
+------+---------+
|     0|103411724|
|     1|  2548722|
+------+---------+



                                                                                

In [13]:
df_insert = df.filter(F.col("from_internal")==True).select("window", "internal_ip", "external_ip","mac_ts")
df_query = df.filter(F.col("from_internal")==False)
df_insert.cache()
df_query.cache()

DataFrame[src_ip: string, dst_ip: string, src_port: int, dst_port: int, protocol: int, report: bigint, mac_ts: bigint, window: int, from_internal: boolean, external_ip: string, internal_ip: string]

In [14]:
print(df_insert.count())
print(df_query.count())

# 39061955
# 66851275

                                                                                

39075755




66884691


                                                                                

In [16]:
import shutil
shutil.rmtree("/media/disk1/mengying/csv/win_60/df_insert.csv")
shutil.rmtree("/media/disk1/mengying/csv/win_60/df_query.csv")

df_insert.repartition(1).write.csv("/media/disk1/mengying/csv/win_60/df_insert.csv",header=True)
df_query.repartition(1).write.csv("/media/disk1/mengying/csv/win_60/df_query.csv",header=True)

                                                                                

In [None]:
df_insert = spark.read.csv("/media/disk1/mengying/csv/win_60/df_insert.csv", inferSchema=True,header=True)
df_query = spark.read.csv("/media/disk1/mengying/csv/win_60/df_query.csv", inferSchema=True,header=True)

In [17]:
df.unpersist()

DataFrame[src_ip: string, dst_ip: string, src_port: int, dst_port: int, protocol: int, report: bigint, mac_ts: bigint, window: int, from_internal: boolean, external_ip: string, internal_ip: string]

In [18]:
# Use 60 seconds to calculate the ground truth
TARGET_TIME_IN_NANOSEC = 60 * (10**9)
WINDOW_SIZE_IN_NANOSEC = 1 * (10**9)

In [19]:
num_wins = math.ceil(TARGET_TIME_IN_NANOSEC / WINDOW_SIZE_IN_NANOSEC )
df_query_drop = df_query
df_insert_grouped = df_insert.groupBy("window", "internal_ip", "external_ip").\
                              agg(F.min("mac_ts").alias("min_insert_mac_ts"),
                                  F.max("mac_ts").alias("max_insert_mac_ts"))
shutil.rmtree("/media/disk1/mengying/csv/win_60/df_query_drop.csv")
df_query_drop.repartition(1).write.csv("/media/disk1/mengying/csv/win_60/df_query_drop_1.csv",header=True)

                                                                                

In [20]:
for win in range(1, num_wins):
    df_query_drop = spark.read.csv("/media/disk1/mengying/csv/win_60/df_query_drop_" + str(win) +".csv", inferSchema=True,header=True)
    df_insert_shifted = df_insert_grouped.withColumn("window", (F.col("window") + win))
    df_query_drop = df_query_drop.join(df_insert_shifted.select("window", "internal_ip", "external_ip"), 
                                       on = ["window", "internal_ip", "external_ip"], 
                                       how="left_anti")
    df_query_drop.repartition(1).write.csv("/media/disk1/mengying/csv/win_60/df_query_drop_" + str(win+1) +".csv",header=True)

                                                                                

In [23]:
df_query_drop = spark.read.csv("/media/disk1/mengying/csv/win_60/df_query_drop_" + str(num_wins) +".csv", inferSchema=True,header=True)
df_query_drop.cache()
print(df_query_drop.count())



5398200


                                                                                

In [24]:
df_query_drop_2 = df_query_drop.join(df_insert_grouped.select("window", "internal_ip", "external_ip", "min_insert_mac_ts"),
                                   on = ["window", "internal_ip", "external_ip"], 
                                   how="left").\
                              fillna(max_ts + 1).\
                              where(F.col("min_insert_mac_ts") - F.col("mac_ts") >= 0 ).\
                              drop("min_insert_mac_ts").\
                              join(df_insert_grouped.withColumn("window", (F.col("window") + num_wins)).\
                                                     select("window", "internal_ip", "external_ip", "max_insert_mac_ts"),
                                   on = ["window", "internal_ip", "external_ip"], 
                                   how="left").\
                              fillna(min_ts - TARGET_TIME_IN_NANOSEC - 1).\
                              where(F.col("mac_ts") - F.col("max_insert_mac_ts") > TARGET_TIME_IN_NANOSEC ).\
                              drop("max_insert_mac_ts").\
                              withColumn("groud_truth", F.lit(1))

df_query_drop_2 = df_query_drop_2.withColumnRenamed("window", "window").\
                                withColumnRenamed("internal_ip", "internal_ip").\
                                withColumnRenamed("external_ip", "external_ip").\
                                withColumnRenamed("src_ip", "src_ip").\
                                withColumnRenamed("dst_ip", "dst_ip").\
                                withColumnRenamed("src_port", "src_port").\
                                withColumnRenamed("dst_port", "dst_port").\
                                withColumnRenamed("protocol", "protocol").\
                                withColumnRenamed("report", "report").\
                                withColumnRenamed("mac_ts", "mac_ts").\
                                withColumnRenamed("from_internal", "from_internal").\
                                withColumnRenamed("groud_truth", "groud_truth")
df_query_drop_2.cache()
print(df_query_drop_2.count())


[Stage 936:===>                                                  (7 + 93) / 100]

2561773


                                                                                

In [26]:
df_comp = df.join(df_query_drop_2, on = df.columns, how="left").fillna(0)
df_comp.cache()
df_comp.repartition(1).write.csv("/media/disk1/mengying/csv/win_60/df_comp.csv",header=True)

23/09/14 22:58:46 WARN CacheManager: Asked to cache already cached data.
                                                                                

In [27]:
# Check no false negatives

df_fp = df_comp.where(F.col("report") != F.col("groud_truth"))
df_fp.groupBy("report","groud_truth").agg(F.sum(F.lit(1)).alias("count")).show()




+------+-----------+-----+
|report|groud_truth|count|
+------+-----------+-----+
|     0|          1|13051|
+------+-----------+-----+



                                                                                

In [28]:
# Check outgoing are all allowed

df.filter(F.col("from_internal")==True).groupBy("report").agg(F.sum(F.lit(1)).alias("count")).show()




+------+--------+
|report|   count|
+------+--------+
|     0|39075755|
+------+--------+



                                                                                

In [31]:
df_comp.printSchema()

root
 |-- src_ip: string (nullable = true)
 |-- dst_ip: string (nullable = true)
 |-- src_port: integer (nullable = true)
 |-- dst_port: integer (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- report: long (nullable = true)
 |-- mac_ts: long (nullable = true)
 |-- window: integer (nullable = true)
 |-- from_internal: boolean (nullable = true)
 |-- external_ip: string (nullable = true)
 |-- internal_ip: string (nullable = true)
 |-- groud_truth: integer (nullable = true)



In [32]:

df_comp_win = df_comp.withColumn("num_falsepos", F.when(((F.col("report") == 0) & (F.col("groud_truth") == 1)), F.lit(1)).otherwise(F.lit(0))).\
                      withColumn("num_falseneg", F.when(((F.col("report") == 1) & (F.col("groud_truth") == 0)), F.lit(1)).otherwise(F.lit(0))).\
                      withColumn("num_truepos", F.when(((F.col("report") == 0) & (F.col("groud_truth") == 0)), F.lit(1)).otherwise(F.lit(0))).\
                      withColumn("num_trueneg", F.when(((F.col("report") == 1) & (F.col("groud_truth") == 1)), F.lit(1)).otherwise(F.lit(0))).\
                      groupBy("window").\
                      agg(F.sum("num_falsepos").alias("num_falsepos"),
                          F.sum("num_falseneg").alias("num_falseneg"),
                          F.sum("num_truepos").alias("num_truepos"),
                          F.sum("num_trueneg").alias("num_trueneg"),
                          F.sum("groud_truth").alias("num_negatives"),
                          F.sum(F.lit(1)).alias("num_packets")).\
                      withColumn("FPR", F.col("num_falsepos")/F.col("num_negatives"))


In [33]:
df_comp_win.orderBy("window").show(100)



+------+------------+------------+-----------+-----------+-------------+-----------+--------------------+
|window|num_falsepos|num_falseneg|num_truepos|num_trueneg|num_negatives|num_packets|                 FPR|
+------+------------+------------+-----------+-----------+-------------+-----------+--------------------+
|     0|           0|           0|     125179|       5403|         5403|     130582|                 0.0|
|     1|           0|           0|     183839|       4395|         4395|     188234|                 0.0|
|     2|           0|           0|     186836|       3934|         3934|     190770|                 0.0|
|     3|           0|           0|     194334|       3801|         3801|     198135|                 0.0|
|     4|           0|           0|     184680|       3929|         3929|     188609|                 0.0|
|     5|           0|           0|     171284|       3565|         3565|     174849|                 0.0|
|     6|           0|           0|     190986|

                                                                                

In [51]:
df_comp_win.groupBy().\
            agg(F.sum("num_falsepos").alias("num_falsepos"),
                F.sum(F.col("num_negatives")).alias("num_negatives")).\
            withColumn("FPR", F.round(F.col("num_falsepos")/F.col("num_negatives"), 10)).show()



+------------+-------------+------------+
|num_falsepos|num_negatives|         FPR|
+------------+-------------+------------+
|       13051|      2561773|0.0050945185|
+------------+-------------+------------+



                                                                                