## FAIR USE POLICY
**Please do not leave your Jupyter lab server idle for extended periods of time.** The Jupyter process, active Python kernels, and especially running Spark contexts, claim a minimum amount of cluster resources. These add up and will get starve resources of others eventually. Leaving your environment idle for a few hours (e.g., over lunch) is fine. But letting it idle overnight or for multiple days in which you are not actively using the cluster is not. You can kill the server from your SSH session, by pressing ctrl+c repeatedly, or by selecting *File->Shutdown* from the menu.

### Imports

In [1]:
import os
import time
from datetime import datetime, timedelta, date
import pprint

import collections
import numpy as np
import pandas as pd
import subprocess

import pyspark
# Find Spark
import findspark
findspark.init()


from datetime import datetime, timedelta, date
import datetime as dt
from datetime import timedelta as td
import os
import pandas as pd
import subprocess
import re
import pyarrow as pa
import copy
from pyspark.sql.types import DateType
from string import digits
from dateutil import parser

import nltk
import pyspark
from pyspark.sql import *  
from pyspark.sql.functions import *
import pyspark.sql.functions as F 
from pyspark.sql import functions as F, Window
from pyspark.sql.functions import from_unixtime
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import mean as _mean
from pyspark.sql.window import Window as W
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, LongType
from scipy import stats
from scipy.stats import mannwhitneyu


from functools import reduce
from pyspark.ml.feature import Tokenizer, StopWordsRemover


import json
import glob
#json.loads('{"":"\\ud800"}')



#spark.conf.set("spark.sql.broadcastTimeout", 3000)
#spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

#### Create Spark Configuration

In [2]:
# Create a SparkConf
APP_NAME = "apwg-median-app"

spark_conf = pyspark.SparkConf().setAppName(APP_NAME).setMaster("yarn").set(
    "spark.submit.deployMode", "client"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.dynamicAllocation.maxExecutors", "16"
).set("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark24_2.11:3.0.0"
).set("spark.sql.debug.maxToStringFields", "1000"
).set("spark.executor.memory", "7G")

#### Start SparkContext
1. This may take a minute to complete
2. You should not (and cannot) start two Spark contexts. If you accidentally run this cell twice or get stuck somehow, restart your Python kernel from the menu above.
3. Please **stop your Spark context** when idling for extended periods of time (see code at bottom of notebook)

In [3]:
print("[{}] Starting Spark context.".format(datetime.now().replace(microsecond=0)))

# SparkContext
sc = pyspark.SparkContext(conf=spark_conf)

# SQLContext
sqlc = pyspark.SQLContext(sc)

[2021-05-05 21:12:02] Starting Spark context.


### Your code below

In [4]:

APWG_CLEAN_DATA_CONVERTED_BASE = "PATH TO DATA-RECORDS"

INTEREST_DATE_START = datetime(2017, 12, 18)
INTEREST_DATE_END   = datetime(2019, 8, 16)

# Read JSON files into Spark DF
clean_mails_df = sqlc.read.json(APWG_CLEAN_DATA_CONVERTED_BASE, multiLine=True).withColumn(
    "parsed_date", F.from_unixtime(F.col("modified")).cast("date")
).filter(
    # Filter date range of interest
    (F.col("parsed_date") >= INTEREST_DATE_START.date().isoformat()) &
    (F.col("parsed_date") <= INTEREST_DATE_END.date().isoformat())
)

In [5]:
#filter emails in english language
original_df = clean_mails_df

In [6]:
original_df = original_df.filter(original_df.language == "english")

In [7]:
h1_df = original_df   
h1_df = h1_df.select('parsed_date', 'id')

In [8]:
remover = StopWordsRemover(inputCol='body_words', outputCol='words_clean')
h2_df = remover.transform(original_df)

h2_df = h2_df.drop("body_words")

word_list=['unsubscribed', 'hack', 'takedown', 'password', 'transparent',\
           'attempt', 'redirect', 'impersonate', 'network', 'obsolete', 'illegal', 'damage', 'edit',\
           'unauthenticated', 'initial', 'survey', 'collect', 'victim', 'detect', 'recharge', 'test',\
           'attachment', 'claim', 'profitable', 'virus', 'fraudulent', 'revalidation', 'link', 'description']

#array_intersect function requires two arrays as arguments, create array from the list of given values:
list_col = F.array(*[F.lit(cl) for cl in word_list])
h2_df = h2_df.filter(F.size(F.array_intersect(F.col("words_clean"), list_col)) > 0)
h2_df = h2_df.select('parsed_date', 'id')

In [9]:
h3_df = original_df.filter(original_df.email_has_attachments == "1")
h3_df = h3_df.select('parsed_date', 'id')

In [10]:
# announcement date of ddos attack

ddos_list = ['2018-01-17', '2018-01-27', '2018-02-08', '2018-03-01', '2018-03-06', '2018-05-14', '2018-05-24', '2018-07-30',\
             '2019-01-16','2019-01-31', '2019-02-23', '2019-03-22', '2019-04-16', '2019-06-12', '2019-08-08']


In [11]:
#partitionBy is used to shuffle data before applying the functions
def hypo_1(df, day):
    
    h1_df1 = (df.filter(f"parsed_date < '{day}' and parsed_date > '{day}' - interval 20 days")
             .withColumn('rn', F.dense_rank().over(Window.orderBy(F.desc('parsed_date'))))
             .filter('rn <= 7')
             .drop('rn')
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date').withColumn('ddos_date', F.lit(day))
          )
    #h1_df1 = h1_df1.dropDuplicates(['parsed_date', 'count_before'])
    h1_df2 = (df.filter(f"parsed_date < '{day}' + interval 20 days and parsed_date > '{day}'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy('parsed_date')))
             .filter('rn <= 7')
             .drop('rn')
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date').withColumn('ddos_date', F.lit(day))
          )
    
    
    return [h1_df1, h1_df2]



In [12]:
def hypo_2(df, day):
    """
    Example usage: df_list = hypo_2(df, '2017-12-18', 15)
    Returns a list of 2 dataframes.
    """
    h2_df1 = (df.filter(f"parsed_date < '{day}' and parsed_date > '{day}' - interval 20 days")
             .withColumn('rn', F.dense_rank().over(Window.orderBy(F.desc('parsed_date'))))
             .filter('rn <= 15')
             .drop('rn')
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date').withColumn('ddos_date', F.lit(day))
          )
   
    
    h2_df2 = (df.filter(f"parsed_date < '{day}' + interval 20 days and parsed_date > '{day}'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy('parsed_date')))
             .filter('rn <= 15')
             .drop('rn')
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date').withColumn('ddos_date', F.lit(day))
          )

    return [h2_df1, h2_df2]

In [13]:
def hypo_3(df, day):
    """
    Example usage: df_list = hypo_3(df, '2017-12-18', 15)
    Returns a list of 2 dataframes.
    """
    h3_df1 = (df.filter(f"parsed_date < '{day}' and parsed_date > '{day}' - interval 20 days")
             .withColumn('rn', F.dense_rank().over(Window.orderBy(F.desc('parsed_date'))))
             .filter('rn <= 15')
             .drop('rn')
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date').withColumn('ddos_date', F.lit(day))
          )

    
    h3_df2 = (df.filter(f"parsed_date < '{day}' + interval 20 days and parsed_date > '{day}'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy('parsed_date')))
             .filter('rn <= 15')
             .drop('rn')
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date').withColumn('ddos_date', F.lit(day))
          )
   
    
    
    return [h3_df1, h3_df2]

In [14]:
h1_df1_list = []
h1_df2_list = []

for d in ddos_list:
    h1_df1, h1_df2 = hypo_1(h1_df, d)
    
    h1_df1 = h1_df1.dropDuplicates(['parsed_date', 'count_before'])
    h1_df2 = h1_df2.dropDuplicates(['parsed_date', 'count_after'])
    
    w_h1_df1= Window.partitionBy(h1_df1['ddos_date']).orderBy(F.desc('parsed_date'))
    # Create column
    h1_df1 = h1_df1.select('*', rank().over(w_h1_df1).alias('ColumnIndex'))
    
    w_h1_df2= Window.partitionBy(h1_df2['ddos_date']).orderBy(h1_df2['parsed_date'])
    # Create column
    h1_df2 = h1_df2.select('*', rank().over(w_h1_df2).alias('ColumnIndex'))
    
    h1_df1_list.append(h1_df1)
    h1_df2_list.append(h1_df2)
    
    
hypo1_df1 = reduce(DataFrame.unionAll, h1_df1_list)
hypo1_df2 = reduce(DataFrame.unionAll, h1_df2_list)


In [15]:
h2_df1_list = []
h2_df2_list = []

for d in ddos_list:
    h2_df1, h2_df2 = hypo_2(h2_df, d)
    
    h2_df1 = h2_df1.dropDuplicates(['parsed_date', 'count_before'])
    h2_df2 = h2_df2.dropDuplicates(['parsed_date', 'count_after'])
    
    w_h2_df1= Window.partitionBy(h2_df1['ddos_date']).orderBy(F.desc('parsed_date'))
    # Create column
    h2_df1 = h2_df1.select('*', rank().over(w_h2_df1).alias('ColumnIndex'))
    
    w_h2_df2= Window.partitionBy(h2_df2['ddos_date']).orderBy(h2_df2['parsed_date'])
    # Create column
    h2_df2 = h2_df2.select('*', rank().over(w_h2_df2).alias('ColumnIndex'))
    
    h2_df1_list.append(h2_df1)
    h2_df2_list.append(h2_df2)
    
    
hypo2_df1 = reduce(DataFrame.unionAll, h2_df1_list)
hypo2_df2 = reduce(DataFrame.unionAll, h2_df2_list)


In [16]:
h3_df1_list = []
h3_df2_list = []

for d in ddos_list:
    h3_df1, h3_df2 = hypo_3(h3_df, d)
    
    h3_df1 = h3_df1.dropDuplicates(['parsed_date', 'count_before'])
    h3_df2 = h3_df2.dropDuplicates(['parsed_date', 'count_after'])
    
    w_h3_df1= Window.partitionBy(h3_df1['ddos_date']).orderBy(F.desc('parsed_date'))
    # Create column
    h3_df1 = h3_df1.select('*', rank().over(w_h3_df1).alias('ColumnIndex'))
    
    w_h3_df2= Window.partitionBy(h3_df2['ddos_date']).orderBy(h3_df2['parsed_date'])
    # Create column
    h3_df2 = h3_df2.select('*', rank().over(w_h3_df2).alias('ColumnIndex'))
    
    h3_df1_list.append(h3_df1)
    h3_df2_list.append(h3_df2)
    

hypo3_df1 = reduce(DataFrame.unionAll, h3_df1_list)
hypo3_df2 = reduce(DataFrame.unionAll, h3_df2_list)


In [17]:
result_1 = hypo1_df1.join(
    hypo1_df2, ['ddos_date', 'ColumnIndex']
).drop('ColumnIndex').groupBy('ddos_date').agg(
    F.collect_list(F.array('count_before', 'count_after')).alias('arr_h1')
).withColumn(
    'u_value & p_value', 
    F.udf(
        lambda arr:
            [float(j) for j in mannwhitneyu([i[0] for i in arr], [i[1] for i in arr], alternative='two-sided')],
        'array<float>'
    )('arr_h1')
)

result_1.orderBy("ddos_date").show(30, truncate=False)

+----------+------------------------------------------------------------------------------------------------+-------------------+
|ddos_date |arr_h1                                                                                          |u_value & p_value  |
+----------+------------------------------------------------------------------------------------------------+-------------------+
|2018-01-17|[[1151, 1041], [1074, 1065], [1121, 1306], [767, 1016], [1054, 741], [1277, 1134], [1328, 1194]]|[30.0, 0.52290326] |
|2018-01-27|[[1105, 1309], [1194, 1270], [1041, 1066], [1016, 880], [741, 1111], [1100, 795], [1306, 1323]] |[19.0, 0.52290326] |
|2018-02-08|[[1143, 1063], [880, 1323], [1231, 910], [1111, 957], [806, 1168], [1052, 833], [1066, 1171]]   |[22.0, 0.7982978]  |
|2018-03-01|[[899, 1264], [1155, 925], [1110, 1258], [1023, 1160], [1244, 1426], [1168, 723], [1236, 1280]] |[16.0, 0.30668506] |
|2018-03-06|[[1264, 1160], [723, 1280]]                                                   

In [18]:
result_2 = hypo2_df1.join(
    hypo2_df2, ['ddos_date', 'ColumnIndex']
).drop('ColumnIndex').groupBy('ddos_date').agg(
    F.collect_list(F.array('count_before', 'count_after')).alias('arr_h2')
).withColumn(
    'u_value & p_value', 
    F.udf(
        lambda arr:
            [float(j) for j in mannwhitneyu([i[0] for i in arr], [i[1] for i in arr], alternative='greater')],
        'array<float>'
    )('arr_h2')
)

result_2.orderBy("ddos_date").show(25, truncate=False)

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|ddos_date |arr_h2                                                                                                                                                                              |u_value & p_value  |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|2018-01-17|[[445, 414], [420, 456], [363, 326], [498, 400], [408, 526], [440, 476], [281, 464], [388, 434], [382, 490], [335, 414], [491, 462], [380, 397], [357, 295], [270, 361], [585, 431]]|[89.0, 0.8402739]  |
|2018-01-27|[[464, 315], [431, 490], [295, 502], [462, 414], [456, 326], [397, 317], [380, 440], [498, 325], [414, 466], [281, 424], [400, 434],

In [19]:
result_3 = hypo3_df1.join(
    hypo3_df2, ['ddos_date', 'ColumnIndex']
).drop('ColumnIndex').groupBy('ddos_date').agg(
    F.collect_list(F.array('count_before', 'count_after')).alias('arr_h3')
).withColumn(
    'u_value & p_value', 
    F.udf(
        lambda arr:
            [float(j) for j in mannwhitneyu([i[0] for i in arr], [i[1] for i in arr], alternative='less')],
        'array<float>'
    )('arr_h3')
)

result_3.orderBy("ddos_date").show(25, truncate=False)

+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|ddos_date |arr_h3                                                                                                                                                                         |u_value & p_value  |
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|2018-01-17|[[221, 105], [211, 180], [65, 152], [70, 139], [155, 132], [108, 111], [161, 137], [153, 230], [76, 155], [115, 91], [117, 174], [154, 241], [108, 238], [127, 81], [255, 102]]|[99.5, 0.3020229]  |
|2018-01-27|[[117, 226], [108, 95], [221, 141], [180, 155], [105, 105], [65, 153], [174, 83], [127, 116], [152, 97], [81, 238], [238, 241], [196, 258], [102, 230], 

#### Stop the SparkContext
note: don't run this block unless you actually want to stop your context

In [20]:
print("[{}] Stopping Spark context.".format(datetime.now().replace(microsecond=0)))
sc.stop()

[2021-05-06 00:38:25] Stopping Spark context.
