<a href="https://colab.research.google.com/github/ganeshbio/CDS-B1-G9/blob/main/M3_NB_MiniProject_3_Complex_Analytics_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Mini-Project: Complex Analytics using Pyspark

## Problem Statement

Perform complex analytics on a network intrusion dataset using Pyspark

## Learning Objectives

At the end of the mini-project, you will be able to :

* analyze the data using Pyspark
* implement RDD based operations on the data
* derive insights from the complex data

### Dataset

The dataset chosen for this mini-project is a [10% subset](https://www.kdd.org/kdd-cup/view/kdd-cup-1999/Data) of the **[KDD Cup 1999 dataset](http://kdd.ics.uci.edu/databases/kddcup99/task.html)** (Computer network intrusion detection). This is the dataset used for the Third International Knowledge Discovery and Data Mining Tools Competition. The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between ``bad`` connections, called intrusions or attacks, and ``good`` normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment.

## Information

Since 1999, KDD’99 has been the most widely used data set for the evaluation of anomaly detection methods. This data set is prepared by S. J. Stolfo and is built based on the data captured in DARPA’98 IDS evaluation program. DARPA’98 is about 4 gigabytes of compressed raw (binary) tcpdump data of 7 weeks of network traffic, which can be processed into about 5 million connection
records, each with about 100 bytes. KDD dataset consists of approximately 4,900,000 single connection vectors each of which contains 41 features and is labeled as either normal or an attack, with exactly one specific attack type. The simulated attacks fall into one of the following four categories:

* Denial of Service Attack (DoS): making some computing or memory resources too busy so that they deny legitimate users access to these resources.
* User to Root Attack (U2R): unauthorized access from a remote machine according to exploit machine's vulnerabilities.
* Remote to Local Attack (R2L): unauthorized access to local super user (root) privileges using system's susceptibility.
* Probing Attack: host and port scans as precursors to other attacks. An attacker scans a network to gather information or find known vulnerabilities.

KDD’99 features can be classified into three groups:

1) Basic features: this category encapsulates all the attributes that can be extracted from a TCP/IP connection. Most of these features leading to an implicit delay in detection.

2) Traffic features: this category includes features that are computed with respect to a window interval and is divided into two groups:

  * "same host" features

  * "same service" features

3) Content features: unlike most of the DoS and Probing attacks, the R2L and U2R attacks don’t have any intrusion frequent sequential patterns. This is because the DoS and Probing attacks involve many connections to some host(s) in a very short period of time, however the R2L and U2R attacks are embedded in the data portions of the packets, and normally involve only a single connection. To detect these kinds of attacks, we need some features to be able to look for suspicious behavior in the data portion, e.g., the number of failed login attempts. These features are called content features.

## Grading = 10 Points

In [1]:
#@title Install packages and Download Dataset
!pip -qq install pyspark
# Download the data
!wget -qq http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
# Download feature names
!wget -qq https://kdd.ics.uci.edu/databases/kddcup99/kddcup.names
print("Successfully Installed packages and downloaded datasets!")

[K     |████████████████████████████████| 212.3MB 71kB/s 
[K     |████████████████████████████████| 204kB 40.2MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Successfully Installed packages and downloaded datasets!


### Creating Spark Session and Load the data (1 point)

#### Import required packages

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.mllib.stat import Statistics 
import seaborn as sns
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from operator import add

#### Create a Spark session

Create a Spark session is a combined entry point of a Spark application, which came into implementation from Spark 2.0 (Instead of having various contexts, everything is encapsulated in a Spark session)

In [3]:
# Start spark session
# YOUR CODE HERE
spark = SparkSession.builder.appName('KDDCup').getOrCreate()
spark

#### Creating an RDD from a File

The most common way of creating an RDD is to load it from a file. Notice that Spark's textFile can handle compressed files directly.

In [88]:
# YOUR CODE HERE to access sparkContext from sparkSession instance.
rdd_name = spark.sparkContext.textFile("/content/kddcup.names")
rdd_name.collect()

['back,buffer_overflow,ftp_write,guess_passwd,imap,ipsweep,land,loadmodule,multihop,neptune,nmap,normal,perl,phf,pod,portsweep,rootkit,satan,smurf,spy,teardrop,warezclient,warezmaster.',
 'duration: continuous.',
 'protocol_type: symbolic.',
 'service: symbolic.',
 'flag: symbolic.',
 'src_bytes: continuous.',
 'dst_bytes: continuous.',
 'land: symbolic.',
 'wrong_fragment: continuous.',
 'urgent: continuous.',
 'hot: continuous.',
 'num_failed_logins: continuous.',
 'logged_in: symbolic.',
 'num_compromised: continuous.',
 'root_shell: continuous.',
 'su_attempted: continuous.',
 'num_root: continuous.',
 'num_file_creations: continuous.',
 'num_shells: continuous.',
 'num_access_files: continuous.',
 'num_outbound_cmds: continuous.',
 'is_host_login: symbolic.',
 'is_guest_login: symbolic.',
 'count: continuous.',
 'srv_count: continuous.',
 'serror_rate: continuous.',
 'srv_serror_rate: continuous.',
 'rerror_rate: continuous.',
 'srv_rerror_rate: continuous.',
 'same_srv_rate: cont

Load the dataset and show the top 10 records

In [77]:
filePath = "/content/kddcup.data_10_percent.gz"
# YOUR CODE HERE
data_rdd=spark.sparkContext.textFile(filePath)
data_rdd.take(10)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,59,59,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,212,1940,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,2,0.00,0.00,0.00,0.00,1.00,0.00,

### RDD Basic Operations (4 points)

#### Convert the data to CSV format (list of elements).

To create a Dataframe using the RDD file, convert each row into a list by splitting with a comma (,)

Hint: `map()` and `split()`

In [43]:
# YOUR CODE HERE
csv_data = data_rdd.map(lambda x: x.split(","))
csv_data.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

Count how many interactions are normal and attacked in the dataset.

Hint: apply `filter` on each row, except the rows with 'normal.', all the remaining values are attacked.

In [82]:
# YOUR CODE HERE
normal_raw_data = csv_data.filter(lambda x: 'normal.' in x)
attacked_raw_data=csv_data.filter(lambda x: 'normal.' not in x)
normal_count=normal_raw_data.count()
attacked_count=attacked_raw_data.count()

print("Normal Count-->",normal_count,"Attack Count-->",attacked_count,"Total Count-->",csv_data.count())

Normal Count--> 97278 Attack Count--> 396743 Total Count--> 494021


#### Protocol and Service combinations using Cartesian product

We can compute the Cartesian product between two RDDs by using the Cartesian transformation. It returns all possible pairs of elements between two RDDs. In our case, we will use it to generate all the possible combinations between Service and Protocol in our network interactions.

First of all, isolate each collection of values in two separate RDDs. For that use `distinct` on the CSV-parsed dataset. From the dataset description, we know that protocol is the second column and service is the third.

In [62]:
# YOUR CODE HERE
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

['tcp', 'udp', 'icmp']

In [63]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

['http',
 'smtp',
 'finger',
 'domain_u',
 'auth',
 'telnet',
 'ftp',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'rje',
 'time',
 'mtp',
 'link',
 'remote_job',
 'gopher',
 'ssh',
 'name',
 'whois',
 'domain',
 'login',
 'imap4',
 'daytime',
 'ctf',
 'nntp',
 'shell',
 'IRC',
 'nnsp',
 'http_443',
 'exec',
 'printer',
 'efs',
 'courier',
 'uucp',
 'klogin',
 'kshell',
 'echo',
 'discard',
 'systat',
 'supdup',
 'iso_tsap',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'sql_net',
 'vmnet',
 'bgp',
 'Z39_50',
 'ldap',
 'netstat',
 'urh_i',
 'X11',
 'urp_i',
 'pm_dump',
 'tftp_u',
 'tim_i',
 'red_i']

Now let's do the Cartesian product

Hint: [cartesian](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cartesian.html#:~:text=Return%20the%20Cartesian%20product%20of,and%20b%20is%20in%20other%20.)

In [72]:
# YOUR CODE HERE
product = protocols.cartesian(services).collect()
print ("There are {} combinations of Protocol X Service".format(len(product)))

There are 198 combinations of protocol X service


#### Inspecting interaction duration

select the total duration of interactions for normal and attack intrusion types.
* Use the above filtered normal and attacked data and convert the duration column to integer type using `map()`
* get the sum of duration by applying `reduce` on both the data using add operator
* find the mean of duration by dividing the sum with count

Hint: [reduce()](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.reduce.html)

In [100]:
# YOUR CODE HERE


Py4JJavaError: ignored

In [None]:
# YOUR CODE HERE

In [None]:
# YOUR CODE HERE

#### Data aggregation with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements.

* create a key/value pair of intrusion type and duration
* calculate the total duration of each intrusion type using `reduceByKey()`

In [None]:
# YOUR CODE HERE

### Create a DataFrame with the header as features (2 points)

* Read the features (*kddcup.names*) and preprocess.

    Hints:
    - Each feature description appears row-wise in *kddcup.names*
    - The first row consists of distinct values of intrusion_types
    - Add or move the *intrusion_types* column name to the last, to align with the data.
    - Each feature is represented as *feature_name*: *type*, remove *type* after colon (:)

In [None]:
# YOUR CODE HERE

* Create a dataframe with the data and headers as preprocessed feature names

In [None]:
# YOUR CODE HERE

#### What is the count of each protocol type?

Hint: apply `groupby` on protocol_type and count the records

In [None]:
# YOUR CODE HERE

#### Register the DataFrame as a temporary table and extract the data using queries

Hint: [registerTempTable](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.registerTempTable.html)

In [None]:
# YOUR CODE HERE

* query to extract the label and their frequencies
 
 Hint: `SQLContext.sql(query)`

In [None]:
# YOUR CODE HERE

* select the distinct protocol types with their count of transactions which are not normal

In [None]:
# YOUR CODE HERE

* select count of transactions in each protocol type that lasts more than 1 second (duration > 1000), with no data transfer from destination (dst_bytes == 0)

In [None]:
# YOUR CODE HERE

### Find the highly correlated columns (2 points)

* identify the columns which are not integer type and remove those columns
* apply correlation function on the data (Hint: `Statistics.corr()`)
* collect the names of the columns on which correlation is applied
* create a dataframe with correlation matrix with index and columns as names
* get the highly correlated features by considering a correlation value greater than 0.8

    Hint: `np.triu()` , `pd.mask()`

In [None]:
# YOUR CODE HERE

In [None]:
# eliminating names of columns having string type
# YOUR CODE HERE

In [None]:
# creating a dataframe with correlation matrix
# YOUR CODE HERE

In [None]:
#  fill the null values with 0
# YOUR CODE HERE

In [None]:
# Finding features with correlation value more than specified threshold value (bar=0.9)
# YOUR CODE HERE

### Analysis report (1 points)

* Find the ratio of attacked transactions vs normal transactions

    Hint: encode instrusion_type column by replacing normal with 1 and all other with 0

* Describe the statistics of attacked and normal transactions
    
    Hint: Min, Max, Mean
    
* Select any two features that influence the intrusion_type and visualize the scatter plot to see the separation of normal and attacked

In [None]:
# YOUR CODE HERE

In [None]:
# YOUR CODE HERE

In [None]:
# YOUR CODE HERE