# **KDDCup Data Analytics with PySpark RDD: A structured case study**<a href="#KDDCup-Data-Analytics-with-PySpark-RDD:-A-structured-case-study" class="anchor-link">¶</a>

### Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark<a href="#Udemy-Course:-Best-Hands-on-Big-Data-Practices-and-Use-Cases-using-PySpark" class="anchor-link">¶</a>

### Author: Amin Karami (PhD, FHEA)<a href="#Author:-Amin-Karami-(PhD,-FHEA)" class="anchor-link">¶</a>

##### data source: <http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html><a href="#data-source:-http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html" class="anchor-link">¶</a>

In \[ \]:

    ########## ONLY in Colab ##########
    !pip3 install pyspark
    ########## ONLY in Colab ##########

In \[ \]:

    ########## ONLY in Ubuntu Machine ##########
    # Load Spark engine
    !pip3 install -q findspark
    import findspark
    findspark.init()
    ########## ONLY in Ubuntu Machine ##########

In \[ \]:

    from pyspark import SparkContext, SparkConf

    # Initializing Spark
    conf = SparkConf().setAppName("KDDCup_PySpark").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print(sc)
    print("Ready to go!")

In \[ \]:

    from google.colab import drive
    drive.mount('/content/drive')

In \[ \]:

    # Read and Load Data to Spark
    # Data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

    rdd = sc.textFile("/content/drive/MyDrive/KDDCup_Data/kddcup.data.gz")

In \[ \]:

    # Repartition and Cache Data:

    rdd = rdd.repartition(10) # shuffle all data
    print(sc.defaultParallelism)
    print(rdd.getNumPartitions())
    rdd.persist()

## Question 1: Get ten records randomly<a href="#Question-1:-Get-ten-records-randomly" class="anchor-link">¶</a>

In \[ \]:

    rdd.takeSample(False, 10, 1234)

## Question 2: Count elements<a href="#Question-2:-Count-elements" class="anchor-link">¶</a>

In \[ \]:

    rdd.count()

## Question 3: Calculate the ratio of `normal` connections<a href="#Question-3:-Calculate-the-ratio-of-normal-connections" class="anchor-link">¶</a>

In \[ \]:

    Normal_rdd = rdd.filter(lambda line: 'normal.' in line)
    ratio = Normal_rdd.count()/rdd.count()

    print("The ratio of normal connections is {} %".format(round(ratio, 4) * 100))

## Question 4: Get the list of labels<a href="#Question-4:-Get-the-list-of-labels" class="anchor-link">¶</a>

In \[ \]:

    Split_rdd = rdd.map(lambda line: line.split(","))
    Label_rdd = Split_rdd.map(lambda item: item[-1]).distinct() # Get last index for "labels"
    Label_rdd.collect()

Out\[ \]:

    ['neptune.',
     'loadmodule.',
     'warezclient.',
     'pod.',
     'smurf.',
     'nmap.',
     'spy.',
     'back.',
     'teardrop.',
     'ipsweep.',
     'multihop.',
     'phf.',
     'ftp_write.',
     'guess_passwd.',
     'normal.',
     'land.',
     'satan.',
     'imap.',
     'portsweep.',
     'warezmaster.',
     'rootkit.',
     'buffer_overflow.',
     'perl.']

## Question 5: Count the number of connections for each label<a href="#Question-5:-Count-the-number-of-connections-for-each-label" class="anchor-link">¶</a>

In \[ \]:

    def LabelCount_func(items):
      Labels_Count = []
      for i in items:
        Labels_Count.append(rdd.filter(lambda line: i in line).count())

In \[ \]:

    %%time
    Labels_list = Label_rdd.collect()
    LabelCount_func(Labels_list)

    CPU times: user 962 ms, sys: 129 ms, total: 1.09 s
    Wall time: 2min 2s

In \[ \]:

    # Solution 2: Create a pair of <Key, Values> to speed up the parallel processing
    %%time
    Label_rdd_KV = Split_rdd.map(lambda x: (x[-1],1))
    Label_rdd_Reduce = Label_rdd_KV.reduceByKey(lambda a,b: a+b)

    CPU times: user 12 ms, sys: 1.45 ms, total: 13.4 ms
    Wall time: 26.6 ms

In \[ \]:

    # Visualize results
    import pandas as pd
    Keys = Label_rdd_Reduce.keys().collect()
    Values = Label_rdd_Reduce.values().collect()

    DF_labels_KV = pd.DataFrame({'Label': Keys,
                              'Count': Values})
    DF_labels_KV.sort_values(by = "Count", ascending=False)

Out\[ \]:

|     | Label            | Count   |
|-----|------------------|---------|
| 4   | smurf.           | 2807886 |
| 0   | neptune.         | 1072017 |
| 14  | normal.          | 972781  |
| 16  | satan.           | 15892   |
| 9   | ipsweep.         | 12481   |
| 18  | portsweep.       | 10413   |
| 5   | nmap.            | 2316    |
| 7   | back.            | 2203    |
| 2   | warezclient.     | 1020    |
| 8   | teardrop.        | 979     |
| 3   | pod.             | 264     |
| 13  | guess_passwd.    | 53      |
| 21  | buffer_overflow. | 30      |
| 15  | land.            | 21      |
| 19  | warezmaster.     | 20      |
| 17  | imap.            | 12      |
| 20  | rootkit.         | 10      |
| 1   | loadmodule.      | 9       |
| 12  | ftp_write.       | 8       |
| 10  | multihop.        | 7       |
| 11  | phf.             | 4       |
| 22  | perl.            | 3       |
| 6   | spy.             | 2       |

![](attachment:vertopal_e276a9d36ea24415ab1abe2fdd81a75e/7b5974ee7b7391c1d1a12e31fb9b4c8ef16527c7.svg)

## Question 6: Get the connection type with successful `root_shell` connections to server, where the number of data bytes from source (`src_bytes`) is 500 times more than from server (`dst_bytes`).<a href="#Question-6:-Get-the-connection-type-with-successful-root_shell-connections-to-server,-where-the-number-of-data-bytes-from-source-(src_bytes)-is-500-times-more-than-from-server-(dst_bytes)." class="anchor-link">¶</a>

In \[ \]:

    Split_rdd.filter(lambda x: x[13] == '1') \
    .map(lambda x: (x[1], x[4], x[5])) \
    .filter(lambda x: int(x[2]) > int(x[1]) * 500) \
    .collect()

Out\[ \]:

    [('tcp', '353', '759161'),
     ('tcp', '433', '1524348'),
     ('tcp', '296', '507534'),
     ('tcp', '296', '507534'),
     ('tcp', '246', '866032'),
     ('tcp', '317', '394616'),
     ('tcp', '262', '744605'),
     ('tcp', '173', '744605'),
     ('tcp', '224', '2776333'),
     ('tcp', '262', '744605'),
     ('tcp', '0', '2072'),
     ('tcp', '351', '759161'),
     ('tcp', '1794', '3851730'),
     ('tcp', '465', '320362'),
     ('tcp', '0', '2072'),
     ('tcp', '0', '2072'),
     ('tcp', '296', '507534'),
     ('tcp', '266', '507534'),
     ('tcp', '255', '574784'),
     ('tcp', '0', '2072')]

## Question 7: Get the list of `Protocols`that are `normal` and `vulnerable to attacks`, where there is NOT `guest login` to the destination addresses. \[ino DFsho ham bezan va moghayese kon\]<a href="#Question-7:--Get-the-list-of-Protocolsthat-are-normal-and-vulnerable-to-attacks,-where-there-is-NOT-guest-login-to-the-destination-addresses.-%5Bino-DFsho-ham-bezan-va-moghayese-kon%5D" class="anchor-link">¶</a>

In \[ \]:

    normal_protocols_rdd = Split_rdd.filter(lambda line: "normal" in line[-1] and line[21] !='1') \
             .map(lambda line: (line[1], 1)).reduceByKey(lambda x,y: x+y)

    attack_protocols_rdd = Split_rdd.filter(lambda line: "normal" not in line[-1] and line[21] !='1') \
             .map(lambda line: (line[1], 1)).reduceByKey(lambda x,y: x+y)


    normal_KeyValue = pd.DataFrame({'Label': normal_protocols_rdd.keys().collect(), 'State': 'normal', 'Count': normal_protocols_rdd.values().collect()})
    attack_KeyValue = pd.DataFrame({'Label': attack_protocols_rdd.keys().collect(), 'State': 'attack', 'Count': attack_protocols_rdd.values().collect()})

    results = normal_KeyValue.append(attack_KeyValue)
    results.sort_values(by = "Label", ascending=False)

Out\[ \]:

|     | Label | State  | Count   |
|-----|-------|--------|---------|
| 1   | udp   | normal | 191348  |
| 1   | udp   | attack | 2940    |
| 2   | tcp   | normal | 764894  |
| 2   | tcp   | attack | 1101613 |
| 0   | icmp  | normal | 12763   |
| 0   | icmp  | attack | 2820782 |

![](attachment:vertopal_e276a9d36ea24415ab1abe2fdd81a75e/7b5974ee7b7391c1d1a12e31fb9b4c8ef16527c7.svg)

## Question 8: Get a summary statistics for the sum of 'tcp' connections to the same destination IP address (hint: `protocol_type` and `dst_host_count` features).<a href="#Question-8:-Get-a-summary-statistics-for-the-sum-of-&#39;tcp&#39;-connections-to-the-same-destination-IP-address-(hint:-protocol_type-and-dst_host_count-features)." class="anchor-link">¶</a>

In \[ \]:

    # Source: https://spark.apache.org/docs/latest/mllib-statistics.html

    from pyspark.mllib.stat import Statistics 
    from math import sqrt

    summary = Statistics.colStats(Split_rdd.filter(lambda line: line[1] == "tcp").map(lambda line: [int(line[31])])) # the input should be a "vector"

    tcp_mean = round(float(summary.mean()),3)
    tcp_std = round(float(sqrt(summary.variance())),3)
    tcp_min = round(float(summary.min()),3)
    tcp_max = round(float(summary.max()),3)

    print([tcp_mean, tcp_std, tcp_min, tcp_max])

    [201.752, 90.726, 0.0, 255.0]

## \[challenge\] Question 9: Filter the number of `icmp`-based attacks for each `service`<a href="#%5Bchallenge%5D-Question-9:-Filter-the-number-of-icmp-based-attacks-for-each-service" class="anchor-link">¶</a>

In \[ \]: