## Imports

In [22]:
# Import Libraries
import pyspark
from pyspark import SQLContext
from pyspark import SparkContext
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
from pyspark.sql import SparkSession

import os
from datetime import datetime, timedelta, date
import dateutil.relativedelta as relativedelta
import IPython
import pandas as pd
import numpy as np

# Find Spark
import findspark
findspark.init()

# PySpark imports
import pyspark
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark import StorageLevel
import pyspark.sql.functions as F
import pyspark.sql.types as pst
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, lit

import pickle

In [23]:
print(os.environ["SPARK_HOME"])

/expanse/lustre/scratch/jli21/temp_project/spark-3.2.1


## Spark Configuration

In [24]:
user=%env USER
user

'jli21'

In [25]:
hostname = !hostname --fqdn
hostname[0]

'exp-3-19.expanse.sdsc.edu'

In [26]:
# os.environ["PYSPARK_PYTHON"] = "/opt/anaconda3/envs/spark3/bin/python"
# os.environ["SPARK_HOME"] = "/usr/local/spark-3.1.2-bin-hadoop3.2"
os.environ["HADOOP_OPTS"] = "-Djava.library.path=/cm/shared/apps/spack/cpu/opt/spack/linux-centos8-zen/gcc-8.3.1/hadoop/3.2.2/lib/native"
os.environ['PYSPARK_SUBMIT_ARGS'] = fr"--jars /expanse/lustre/scratch/{user}/temp_project/spark-3.2.1/ext_jars/* --packages org.apache.spark:spark-avro_2.12:3.2.1 pyspark-shell"
os.environ["SPARK_LIB"] = os.environ["SPARK_HOME"]

In [27]:
# Set your credentials here
UCSD_NT_S3_ACCESS_KEY = "163a6d8005ad4d1d92faaf71993ef4ae"
UCSD_NT_S3_SECRET_KEY = "8a1dfbbba6e5485088bad4a9882a44b7"

MASTER_URL=fr"spark://{hostname[0]}:7077"

spark = SparkSession.builder.master(MASTER_URL).appName('spark-cluster'
).config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
).config("fs.s3a.access.key", UCSD_NT_S3_ACCESS_KEY
).config("fs.s3a.secret.key", UCSD_NT_S3_SECRET_KEY
).config("fs.s3a.endpoint", "https://hermes.caida.org").config("fs.s3a.path.style.access", "true"
).config("fs.s3a.block.size", "64M").config("fs.s3a.readahead.range", "128K"
).config("fs.s3a.experimental.input.fadvise", "sequential"
).config("fs.s3a.connection.maximum", 256
).config("spark.cores.max", "128"
).config("spark.driver.cores","2"
).config("spark.driver.memory","2G"
).config("spark.executor.cores", "125"
).config("spark.executor.memory", "125G"
# ).config("spark.executor.memoryOverhead", "4G"
).config("io.file.buffer.size", "67108864"
).config("spark.submit.deploymode", "client"
).config("spark.buffer.size", "67108864"
).config("spark.network.timeout", "300s"
).config("spark.sql.session.timeZone", "UTC"
).config("spark.sql.files.ignoreCorruptFiles", "true" # Set this when analyzing periods of high volatility, during which there may be corrupt files
).getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x1555311e6970>


In [28]:
# sc = SparkContext(conf=spark)
# sc.setLogLevel('ERROR')
sqlcontext = SQLContext(spark)



In [29]:
spark

## Functions

In [30]:
import socket, struct
import ipaddress

def long2ip(long):
    return ipaddress.ip_network(socket.inet_ntoa(struct.pack('!L', long)) + '/255.255.255.255', strict=False)

def long2ip24subnet(long):
    return ipaddress.ip_network(socket.inet_ntoa(struct.pack('!L', long)) + '/255.255.255.0', strict=False)

def long2ip16subnet(long):
    return ipaddress.ip_network(socket.inet_ntoa(struct.pack('!L', long)) + '/255.255.0.0', strict=False)

'''
Convert an IP string to long
'''
def ip2long(ip):
    packedIP = socket.inet_aton(ip)
    return struct.unpack("!L", packedIP)[0]

In [31]:
'''
Input: List of IPv4 Subnets in CIDR Notation e.g. ('0.0.0.0/16')

Returns the set of all IP addresses as integers within the subnet.
'''
def explode_prefix(prefix_list):
    prefix_set = set()
    
    for pfx in prefix_list:
        for ip in ipaddress.IPv4Network(pfx):
            prefix_set.add(ip2long(str(ip)))
            
    return prefix_set

In [32]:
import pytz

def load_ft_pyspark(avro_files):
    df = sqlcontext.read.format('avro').load(avro_files)
    df = df.withColumn('time', psf.to_utc_timestamp((df.time).cast(dataType=pst.TimestampType()), 'UTC'))
    # df = df.withColumn('time', psf.from_unixtime(df.time))
    return df

## Analysis

### Optional Filters

#### Source Filters

In [41]:
VICTIM_24 = '157.42.200.0/24'
START = "2023-06-15 00:00"
END = "2023-06-17 23:59"

In [None]:
START += " 00:00"
END += " 23:59"

In [42]:
src_ip_filter = set([
    # '137.110.41.27',
    # '137.110.41.248',
    # '137.110.33.50',
    # '137.110.60.41',
])

src_subnet_filter = set([
    # '137.110.0.0/16'
    # '1.116.129.0/24',
    VICTIM_24,
])

src_asn_filter = [
]

In [43]:
# Convert to integers
src_ip_filter = set(map(ip2long, src_ip_filter))
src_subnet_filter = explode_prefix(src_subnet_filter)
# Union
ip_filter = src_ip_filter.union(src_subnet_filter)

print(f'IP filter cardinality: {len(ip_filter)}')
print(f'ASN filter cardinality: {len(src_asn_filter)}')

IP filter cardinality: 256
ASN filter cardinality: 0


#### Dest. Filters

In [44]:
dst_ip_filter = set([
])

dst_port_filter = set([
    # 22,
    # 1883
])

dst_port_neg_filter = set([
    # 22, 
    # 23, 
    # 80, 
    # 81,
    # 445,
    # 1433, 
    # 2323, 
    # 3389, 
    # 5555,
    # 8080,
    # 52869
])

#### Temporal Filter

In [45]:
# Inclusive start and end times
start = pd.to_datetime(START)
end = pd.to_datetime(END)

### Load .avro files

In [46]:
def extract_timestamp(filename):
    return int(filename.split('.')[1])

def build_uri(row):
    return f's3a://{row.container}/{row.datasource}/year={row.year}/month={row.month}/day={row.day}/hour={row.hour}/{row.filename}'

In [47]:
avro_df = pd.read_parquet('ft4_file_lists/telescope-ucsdnt-avro-flowtuple-v4-2023.parquet.gzip')

### Reformatting
avro_df['datetime2'] = avro_df['filename'].apply(lambda x: extract_timestamp(x))
avro_df['datetime2'] = pd.to_datetime(avro_df['datetime2'], unit='s')
avro_df = avro_df.set_index('datetime2')
# print(avro_df.head(3))

### Select .avro's within timeframe
selected_avros = avro_df[(avro_df.index >= start) & (avro_df.index <= end)]
print(f'Avro filecount: {len(selected_avros)}')

### Build URI's
selected_avro_uris = selected_avros.apply(lambda x: build_uri(x), axis=1).values
print(f'Avro uri count: {len(selected_avro_uris)}')

Avro filecount: 852
Avro uri count: 852


In [48]:
%%time

# Load .avro's
spark_df = load_ft_pyspark(list(selected_avro_uris))

24/06/24 21:14:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

CPU times: user 38.8 ms, sys: 6.01 ms, total: 44.8 ms
Wall time: 15.9 s


### Apply Filters

In [49]:
def create_filter_df(items, col_name):
    return spark.createDataFrame(
        [(item,) for item in items],
        [col_name]
    )

In [50]:
traff_df = spark_df

if len(ip_filter) > 0:
    print(f'Applying Source IP Filter')
    ip_filter_df = create_filter_df(ip_filter, 'src_ip')
    traff_df = traff_df.join(
        F.broadcast(
            ip_filter_df
            ),
        on='src_ip'
    )

if len(src_asn_filter) > 0:
    print(f'Applying Source ASN Filter')
    asn_filter_df = create_filter_df(src_asn_filter, 'prefix2asn')
    traff_df = traff_df.join(
        F.broadcast(
            asn_filter_df
            ),
        on='prefix2asn'
    )
    

if len(dst_port_filter) > 0:
    print(f'Applying Dest. Port Filter')
    dp_filter_df = create_filter_df(dst_port_filter, 'dst_port')
    traff_df = traff_df.join(
        F.broadcast(
            dp_filter_df
            ),
        on='dst_port'
    )

if len(dst_port_neg_filter) > 0:
    print(f'Applying Dest. Port Negative Filter')
    dp_filter_df = create_filter_df(dst_port_neg_filter, 'dst_port')
    traff_df = traff_df.join(
        F.broadcast(
            dp_filter_df
            ),
        on='dst_port',
        how='leftanti'
    )

Applying Source IP Filter


### Diagnostic Analysis

In [51]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

def lineplot(series, label, title, ylabel, xlabel):        
    plt.figure(figsize=(10, 2))
    plt.plot(series, label=f'{label}', marker='.')
    plt.legend()

    # plt.gca().xaxis.set_major_locator(mdates.MinuteLocator(byminute=[0, 30]))
    plt.gca().xaxis.set_major_locator(mdates.HourLocator(byhour=[0, 6, 12, 18]))
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%m-%d \n %H:%M"))
    plt.gcf().autofmt_xdate()

    plt.title(f'{title}', fontsize=14)
    plt.ylabel(f'{ylabel}', fontsize=14)
    plt.xlabel(f'{xlabel}', fontsize=14)
    plt.gca().xaxis.set_tick_params(labelsize=8)
    plt.grid()

    plt.show()

In [52]:
import seaborn as sns

def plot_ecdf(df, x, title, xlabel, ylabel):
    plt.figure(figsize=(4, 3))
    sns.ecdfplot(data=df, x=x, log_scale=(True, False))

    plt.ylabel(f'{ylabel}', fontsize=14)
    plt.xlabel(f'{xlabel}', fontsize=14)
    plt.title(f'{title}', fontsize=14)
    plt.gca().set_xlim([0.1, 1e6])
    
    plt.grid()
    plt.show()

In [53]:
%%time
# traff_df = traff_df.cache() #

CPU times: user 0 ns, sys: 2 µs, total: 2 µs
Wall time: 7.39 µs


In [54]:
from pyspark.sql.functions import sum

# Group by the 'time' column and calculate the sum of 'packet_cnt'
grouped_df = traff_df.groupBy(["time", "protocol"]).agg(sum("packet_cnt").alias("total_packet_count"))

# Select only the 'time' and 'total_packet_count' columns
result_df = grouped_df.select("time", "protocol", "total_packet_count")


In [55]:
df = result_df.toPandas()

  series = series.astype(t, copy=False)


In [62]:
df.to_parquet(f'./bgp_parquet/[{start}-{VICTIM_24[:-3]}].parquet.gzip', compression='gzip')