# Apache Spark - Network Intrustion Data - Initial Look
In this notebook I'll be using Spark to wrangle a network intrusion dataset.

## Pre-Wrangling Exploration Results
### Overall Notes:
1. Dataset is 4.8 million rows and 42 columns with no nulls
2. Columns describe the nature of each TCP/IP connection, including duration, protocol, service, and more
3. Target column reveals if a connection was normal, and if it was not normal, what kind of attack was used

### Side Notes: Attack Categories
- DOS: denial-of-service, e.g. syn (synchronize request) flood;
    * Small value in 'duration', large value in 'count' (requests to same host) or 'srv_count' (requests to same service)
- R2L: unauthorized access from a remote machine, e.g. guessing password;
- U2R:  unauthorized access to local superuser (root) privileges, e.g., various 'buffer overflow' attacks;
    * logged_in=False, (root_shell=True | num_root > 0 | su_attempted > 3)
- probing: surveillance and other probing, e.g., port scanning.

### Side Notes: Column Information
- Duration values are in two-second intervals, and the rest of the row's metrics are within that duration time
    * **Automated attacks usually have high value counts in short durations**
- Protocol type: TDP (conversation), UDP (one-way broadcast), ICMP (ping and response)
- A **wrong_fragment value greater than zero** may indicate a man-in-the-middle attack
- A **num_failed_logins greater than three** may indicate a password attack
- Error flags:
    * **SF:** SYN-FIN - Normal
    * **S0:** SYN only - High-volume may indicate DOS attack
    * **REJ:** Rejected - High-volume may indicate DOS attack
    * **RSTR:** Accepted then server rejects
    * **RSTO:** Accepted then client rejects
    * **SH:** No SYN-ACK - High-volume may indicate DOS attack
    * **S1:** Server FIN only
    * **S2:** Client FIN only
    * **RSTOS0:** Connection failed because of client
    * **OTH:** Anything not above
    * **S3:** SYN and time-out
- Root Shell: total control; root_shell=True is root obtained, num_root indicates how many times was accessed
    * su_attempted is 'su root' requests; request is fine but multiple is anomalous
    * Should be logged in to succeed
    * Root credentials would be secured somewhere and no-one has access (too generic, not easy to track)
        * Equivalent rights are granted sometimes but almost never root itself
- num_file_creations: attributed to anomalous stuff if in certain directories, too-high, etc
    
#### Server columns
'duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login', 

#### Analyst Columns
'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'target'

#### Further Information
- OSI Model: https://en.wikipedia.org/wiki/OSI_model

### Next Steps
1. Using results from initial exploration, create project goals and plan
2. Add goals and plan to repository README
3. Proceed with additional wrangling as necessary
4. Take three subsets of the data: train, validate, and test
5. Script the PySpark queries and data manipulations

# Pre-Wrangling Exploration
## Imports

In [1]:
import warnings
warnings.filterwarnings('ignore')

# Apache Spark interfacing import
import pyspark
from pyspark.sql.functions import *
spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/22 19:06:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Data Ingest from CSV

In [2]:
# ingest from CSV
df = spark.read.csv('kddcup.data.corrected', header=True)
print(df.columns)
print('Row count:', df.count())

['duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'target']


[Stage 1:>                                                          (0 + 8) / 8]

Row count: 4898431


                                                                                

## Checking Columns for Nulls

In [3]:
# check column nulls
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show(vertical=True)

22/01/22 19:06:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

-RECORD 0--------------------------
 duration                    | 0   
 protocol_type               | 0   
 service                     | 0   
 flag                        | 0   
 src_bytes                   | 0   
 dst_bytes                   | 0   
 land                        | 0   
 wrong_fragment              | 0   
 urgent                      | 0   
 hot                         | 0   
 num_failed_logins           | 0   
 logged_in                   | 0   
 num_compromised             | 0   
 root_shell                  | 0   
 su_attempted                | 0   
 num_root                    | 0   
 num_file_creations          | 0   
 num_shells                  | 0   
 num_access_files            | 0   
 num_outbound_cmds           | 0   
 is_host_login               | 0   
 is_guest_login              | 0   
 count                       | 0   
 srv_count                   | 0   
 serror_rate                 | 0   
 srv_serror_rate             | 0   
 rerror_rate                

                                                                                

## Checking Value Counts of Some Columns
### Duration Feature Look

In [4]:
# durations
duration_value_counts = df.groupBy('duration').count().sort('count', ascending=False)\
    .withColumn('proportion', round(col('count') / df.count(), 2)).show(8)



+--------+-------+----------+
|duration|  count|proportion|
+--------+-------+----------+
|       0|4779492|      0.98|
|       1|  23886|       0.0|
|       2|   8139|       0.0|
|       3|   6016|       0.0|
|       5|   5576|       0.0|
|    2630|   5061|       0.0|
|       4|   3738|       0.0|
|      14|   2673|       0.0|
+--------+-------+----------+
only showing top 8 rows



                                                                                

### Protocol Type Feature Look

In [5]:
# protocols
df.groupBy('protocol_type').count().sort('count', ascending=False)\
    .withColumn('proportion', round(col('count') / df.count(), 2)).show(8)

[Stage 16:>                                                         (0 + 8) / 8]

+-------------+-------+----------+
|protocol_type|  count|proportion|
+-------------+-------+----------+
|         icmp|2833545|      0.58|
|          tcp|1870598|      0.38|
|          udp| 194288|      0.04|
+-------------+-------+----------+



                                                                                

### Service Feature Look

In [6]:
# services
df.groupBy('service').count().sort('count', ascending=False)\
    .withColumn('proportion', round(col('count') / df.count(), 2)).show(8)

[Stage 22:>                                                         (0 + 8) / 8]

+--------+-------+----------+
| service|  count|proportion|
+--------+-------+----------+
|   ecr_i|2811660|      0.57|
| private|1100831|      0.22|
|    http| 623091|      0.13|
|    smtp|  96554|      0.02|
|   other|  72653|      0.01|
|domain_u|  57782|      0.01|
|ftp_data|  40697|      0.01|
|   eco_i|  16338|       0.0|
+--------+-------+----------+
only showing top 8 rows



                                                                                

### Error Flags Feature Look

In [7]:
# flags - response types
df.groupBy('flag').count().sort('count', ascending=False)\
    .withColumn('proportion', round(col('count') / df.count(), 2)).show(15)

                                                                                

+------+-------+----------+
|  flag|  count|proportion|
+------+-------+----------+
|    SF|3744328|      0.76|
|    S0| 869829|      0.18|
|   REJ| 268874|      0.05|
|  RSTR|   8094|       0.0|
|  RSTO|   5344|       0.0|
|    SH|   1040|       0.0|
|    S1|    532|       0.0|
|    S2|    161|       0.0|
|RSTOS0|    122|       0.0|
|   OTH|     57|       0.0|
|    S3|     50|       0.0|
+------+-------+----------+



### Target Column Feature Look

In [8]:
# target - normal traffic or attack type
df.groupBy('target').count().sort('count', ascending=False)\
    .withColumn('proportion', round(col('count') / df.count(), 2)).show(30)

[Stage 34:>                                                         (0 + 8) / 8]

+----------------+-------+----------+
|          target|  count|proportion|
+----------------+-------+----------+
|          smurf.|2807886|      0.57|
|        neptune.|1072017|      0.22|
|         normal.| 972781|       0.2|
|          satan.|  15892|       0.0|
|        ipsweep.|  12481|       0.0|
|      portsweep.|  10413|       0.0|
|           nmap.|   2316|       0.0|
|           back.|   2203|       0.0|
|    warezclient.|   1020|       0.0|
|       teardrop.|    979|       0.0|
|            pod.|    264|       0.0|
|   guess_passwd.|     53|       0.0|
|buffer_overflow.|     30|       0.0|
|           land.|     21|       0.0|
|    warezmaster.|     20|       0.0|
|           imap.|     12|       0.0|
|        rootkit.|     10|       0.0|
|     loadmodule.|      9|       0.0|
|      ftp_write.|      8|       0.0|
|       multihop.|      7|       0.0|
|            phf.|      4|       0.0|
|           perl.|      3|       0.0|
|            spy.|      2|       0.0|
+-----------



### Combination Counts Feature Look

In [9]:
# show groupby of most-common combos for protocol, service, and binary class
df.withColumn('binary', when(df.target != 'normal.', 'anomalous').otherwise('normal'))\
    .groupBy('protocol_type', 'service', 'binary').count().sort('count', ascending=False)\
    .withColumn('proportion', round(col('count') / df.count(), 2))\
    .show(10)

[Stage 40:>                                                         (0 + 8) / 8]

+-------------+--------+---------+-------+----------+
|protocol_type| service|   binary|  count|proportion|
+-------------+--------+---------+-------+----------+
|         icmp|   ecr_i|anomalous|2808204|      0.57|
|          tcp| private|anomalous|1024311|      0.21|
|          tcp|    http|   normal| 619046|      0.13|
|          tcp|    smtp|   normal|  95371|      0.02|
|          udp| private|   normal|  73848|      0.02|
|          udp|domain_u|   normal|  57773|      0.01|
|          udp|   other|   normal|  55891|      0.01|
|          tcp|ftp_data|   normal|  38093|      0.01|
|          tcp|   other|anomalous|  15869|       0.0|
|         icmp|   eco_i|anomalous|  12570|       0.0|
+-------------+--------+---------+-------+----------+
only showing top 10 rows



                                                                                