# **KDDCup Data Analytics with PySpark DF: A structured case study**

### Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark

### Author: Amin Karami (PhD, FHEA)

##### data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

In [None]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

In [None]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [1]:
# import SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/15 22:13:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/15 22:13:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [9]:
# Read and Load Data to Spark
# Data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
df = spark.read.text("data/kddcup.data.gz")

df.printSchema()

root
 |-- value: string (nullable = true)



In [3]:
# Print data
df.show(10)

+--------------------+
|               value|
+--------------------+
|0,tcp,http,SF,215...|
|0,tcp,http,SF,162...|
|0,tcp,http,SF,236...|
|0,tcp,http,SF,233...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,238...|
|0,tcp,http,SF,235...|
|0,tcp,http,SF,234...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,181...|
+--------------------+
only showing top 10 rows



In [10]:
# Split data (if needed)
from pyspark.sql.functions import split

split_col = split(df['value'], ',')

df = df.withColumn('protocal', split_col.getItem(1)) \
        .withColumn('service', split_col.getItem(2)) \
        .withColumn('flag', split_col.getItem(3)) \
        .withColumn('src_bytes', split_col.getItem(4)) \
        .withColumn('dst_bytes', split_col.getItem(5)) \
        .withColumn('urgent', split_col.getItem(8)) \
        .withColumn('num_failed_logins', split_col.getItem(10)) \
        .withColumn('root_shell', split_col.getItem(13)) \
        .withColumn('guest_login', split_col.getItem(21)) \
        .withColumn('label', split_col.getItem(41)) 
        # .drop('value')


In [18]:
df.show(5)

[Stage 12:>                                                         (0 + 1) / 1]

+--------------------+--------+-------+----+---------+---------+------+-----------------+----------+-----------+--------+
|               value|protocal|service|flag|src_bytes|dst_bytes|urgent|num_failed_logins|root_shell|guest_login|   label|
+--------------------+--------+-------+----+---------+---------+------+-----------------+----------+-----------+--------+
|0,tcp,private,S0,...|     tcp|private|  S0|        0|        0|     0|                0|         0|          0|neptune.|
|0,tcp,private,REJ...|     tcp|private| REJ|        0|        0|     0|                0|         0|          0|neptune.|
|10,udp,private,SF...|     udp|private|  SF|      105|      147|     0|                0|         0|          0| normal.|
|0,tcp,http,SF,343...|     tcp|   http|  SF|      343|     2125|     0|                0|         0|          0| normal.|
|0,tcp,private,S0,...|     tcp|private|  S0|        0|        0|     0|                0|         0|          0|neptune.|
+--------------------+--

                                                                                

In [13]:
df.select('label').show(10)

+-------+
|  label|
+-------+
|normal.|
|normal.|
|normal.|
|normal.|
|normal.|
|normal.|
|normal.|
|normal.|
|normal.|
|normal.|
+-------+
only showing top 10 rows



In [14]:
# Increase the number of partitions (if needed) and Build a Temp table
df = df.repartition(10)
print(df.rdd.getNumPartitions())

df.createOrReplaceTempView('df_kddcup')

[Stage 4:>                                                          (0 + 1) / 1]

10


## Question 1: Count the number of connections for each label

In [17]:
df.groupBy('label').count().orderBy('count', ascending=False).show()



+----------------+-------+
|           label|  count|
+----------------+-------+
|          smurf.|2807886|
|        neptune.|1072017|
|         normal.| 972781|
|          satan.|  15892|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|           nmap.|   2316|
|           back.|   2203|
|    warezclient.|   1020|
|       teardrop.|    979|
|            pod.|    264|
|   guess_passwd.|     53|
|buffer_overflow.|     30|
|           land.|     21|
|    warezmaster.|     20|
|           imap.|     12|
|        rootkit.|     10|
|     loadmodule.|      9|
|      ftp_write.|      8|
|       multihop.|      7|
+----------------+-------+
only showing top 20 rows



                                                                                

## Question 2:  Get the list of `Protocols`that are `normal` and `vulnerable to attacks`, where there is NOT `guest login` to the destination addresses


In [20]:
sql_query = """
SELECT
    protocal,
    CASE label
        WHEN 'normal.' THEN 'No Attack'
        ELSE 'Attack'
    END AS state,
    COUNT(1) AS cnt
FROM df_kddcup
WHERE guest_login <> '1'
GROUP BY
    1,2
ORDER BY 
    1,2
"""
spark.sql(sql_query).show()


[Stage 21:>                                                         (0 + 1) / 1]

+--------+---------+-------+
|protocal|    state|    cnt|
+--------+---------+-------+
|    icmp|   Attack|2820782|
|    icmp|No Attack|  12763|
|     tcp|   Attack|1101613|
|     tcp|No Attack| 764894|
|     udp|   Attack|   2940|
|     udp|No Attack| 191348|
+--------+---------+-------+



                                                                                

## Question 3: Apply Some Descriptive Statistics on Numerical Data

In [22]:
# PySpark provides built-in standard Aggregate functions defines in DataFrame API
from pyspark.sql.functions import *

summary = df.select(
                mean(df['src_bytes']).alias('avg_src_bytes'),
                stddev(df['src_bytes']).alias('std_src_bytes'),
                min(df['src_bytes']).alias('min_src_bytes'),
                max(df['src_bytes']).alias('max_src_bytes'),
                last(df['src_bytes']).alias('last_src_bytes'),
                skewness(df['src_bytes']).alias('skew_src_bytes')
            )

summary.show()

[Stage 35:>                                                       (0 + 10) / 10]

+------------------+-----------------+-------------+-------------+--------------+------------------+
|     avg_src_bytes|    std_src_bytes|min_src_bytes|max_src_bytes|last_src_bytes|    skew_src_bytes|
+------------------+-----------------+-------------+-------------+--------------+------------------+
|1834.6211752293746|941431.0744911298|            0|          999|          2341|1188.9519100465739|
+------------------+-----------------+-------------+-------------+--------------+------------------+



                                                                                

In [24]:
groups = df.groupBy(["protocal"])

groups.agg(
    mean('src_bytes'), 
    stddev('dst_bytes')
) \
.orderBy('protocal', ascending=True)\
.show()

[Stage 45:>                                                         (0 + 1) / 1]

+--------+-----------------+----------------------+
|protocal|   avg(src_bytes)|stddev_samp(dst_bytes)|
+--------+-----------------+----------------------+
|    icmp|927.8916893855577|                   0.0|
|     tcp|3388.569965326596|    1043771.3100418178|
|     udp|97.22772893848308|     55.43318653434132|
+--------+-----------------+----------------------+



                                                                                

## Question 4: A Descriptive Stats based on `Protocols` and `Labels`


In [25]:
query_4 = """
SELECT
    protocal,
    CASE label
        WHEN 'normal.' THEN 'No Attack'
        ELSE 'Attack'
    END AS state,
    COUNT(1) AS cnt,
    ROUND(AVG(src_bytes), 2) AS avg_src_bytes,
    ROUND(AVG(dst_bytes), 2) AS avg_dst_bytes,
    SUM(urgent) AS sum_urgents,
    SUM(num_failed_logins) AS sum_num_failed_logins,
    SUM(root_shell) AS sum_root_shell,
    SUM(guest_login) AS sum_guest_login
FROM df_kddcup
WHERE guest_login <> '1'
GROUP BY
    1,2
ORDER BY 
    1,2
"""
spark.sql(query_4).show()

[Stage 53:>                                                       (0 + 10) / 10]

+--------+---------+-------+-------------+-------------+-----------+---------------------+--------------+---------------+
|protocal|    state|    cnt|avg_src_bytes|avg_dst_bytes|sum_urgents|sum_num_failed_logins|sum_root_shell|sum_guest_login|
+--------+---------+-------+-------------+-------------+-----------+---------------------+--------------+---------------+
|    icmp|   Attack|2820782|       931.68|          0.0|        0.0|                  0.0|           0.0|            0.0|
|    icmp|No Attack|  12763|        90.68|          0.0|        0.0|                  0.0|           0.0|            0.0|
|     tcp|   Attack|1101613|      4466.76|      2005.86|        4.0|                 57.0|          32.0|            0.0|
|     tcp|No Attack| 764894|      1850.34|      4082.34|       35.0|                 95.0|         302.0|            0.0|
|     udp|   Attack|   2940|         26.4|         0.82|        0.0|                  0.0|           0.0|            0.0|
|     udp|No Attack| 191

                                                                                

## Question 5: Get the frquency of `services` for the original `UDP and ICMP` based `attacks`
(hint 1: original attacks: `[dos, u2r, r2l, probe]`)

(hint 2: returns `services` and `protocols` center justified)

In [31]:
from pyspark.sql.types import StringType


def attack_category(item):
    if item.replace(".", "") in ["back", "land", "neptune", "pod", "smurf", "teardrop"]:
        return "DoS"
    elif item.replace(".", "") in ["buffer_overflow", "loadmodule", "perl", "rootkit"]:
        return "u2r"
    elif item.replace(".", "") in ["ftp_write", "guess_password", "multihop", "phf", "spy", "warezclient", "warezmaster"]:
        return "r2l"
    else:
        return "probe"

def center_justify(item):
    return item.center(10)

spark.udf.register("udf_attack_category", attack_category, StringType())
spark.udf.register("udf_center_justify", center_justify, StringType())

query_5 = """
SELECT
    udf_center_justify(service) AS service,
    udf_center_justify(protocal) AS protocal,
    udf_attack_category(label) AS attack_label,
    COUNT(1) AS freq
FROM df_kddcup
WHERE protocal IN ('udp', 'icmp')
AND label <> 'normal.'
GROUP BY 1,2,3
ORDER BY freq DESC
"""
spark.sql(query_5).show()

23/10/15 23:04:56 WARN SimpleFunctionRegistry: The function udf_attack_category replaced a previously registered function.
23/10/15 23:04:56 WARN SimpleFunctionRegistry: The function udf_center_justify replaced a previously registered function.
[Stage 77:>                                                       (0 + 10) / 10]

+----------+----------+------------+-------+
|   service|  protocal|attack_label|   freq|
+----------+----------+------------+-------+
|  ecr_i   |   icmp   |         DoS|2808145|
|  eco_i   |   icmp   |       probe|  12570|
| private  |   udp    |       probe|   1688|
| private  |   udp    |         DoS|    979|
|  other   |   udp    |       probe|    261|
|  ecr_i   |   icmp   |       probe|     59|
| domain_u |   udp    |       probe|      9|
|  tim_i   |   icmp   |         DoS|      5|
|  other   |   udp    |         u2r|      3|
|  urp_i   |   icmp   |       probe|      3|
+----------+----------+------------+-------+



                                                                                

23/10/16 19:42:12 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 216354 ms exceeds timeout 120000 ms
23/10/16 19:42:12 WARN SparkContext: Killing executors is not supported by current scheduler.
23/10/16 19:42:14 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at sc