# **Tutorial: Best Hands-on Structured Data Analysis using PySpark RDD and DF**

## Week 9 - Big Data Analytics - CN7031

**`Dr Amin Karami, UEL UK - Docklands Campus`**

`E: a.karami@uel.ac.uk`

`W: https://www.youtube.com/@AminKarami`

`W: www.aminkarami.com`

---


**Learning outcomes:**
- `Performance Comparison`: gain an understanding of the performance differences between PySpark RDD and DataFrame in various scenarios.
- `Complex Task Handling`: learn how to handle complex tasks using PySpark RDD and DataFrame. The examples provided illustrate scenarios involving multiple transformations and actions, helping learners grasp the capabilities and efficiency of each approach in different use cases.

In [None]:
!pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=eb885a3c3413653a0347ec35ff917a7ed15ead0fd0f37be10fc47f01025fde58
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
# Source: http://kdd.ics.uci.edu/databases/kddcup99/
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

######### if you faced with this error: "HTTPError: HTTP Error 403: Forbidden"
######### download it manually from the Moodle site (week 8) and drag-drop in the "Files"

('kddcup.data.gz', <http.client.HTTPMessage at 0x7cea77c68ee0>)

# Load Data in RDD and DF

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Initialize Spark using conf
conf = SparkConf()
conf.set("spark.app.name", "RDDExample")
conf.set("spark.master", "local[*]")
sc = SparkContext(conf = conf)

# Create a SparkSession
spark = SparkSession.builder.appName("DFExample").getOrCreate()

In [None]:
# Data reading in RDD
data_rdd = sc.textFile("kddcup.data.gz", 8)
data_rdd.take(2)

['0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal.']

In [None]:
# Convert RDD to DataFrame: 42 features in KDDCup
features = ['duration','protocol_type','service','flag','src_bytes','dst_bytes','land','wrong_fragment','urgent','hot','num_failed_logins','logged_in',
            'num_compromised','root_shell','su_attempted','num_root','num_file_creations','num_shells','num_access_files','num_outbound_cmds','is_host_login',
            'is_guest_login','count','srv_count','serror_rate','srv_serror_rate','rerror_rate','srv_rerror_rate','same_srv_rate','diff_srv_rate',
            'srv_diff_host_rate','dst_host_count','dst_host_srv_count','dst_host_same_srv_rate','dst_host_diff_srv_rate','dst_host_same_src_port_rate',
            'dst_host_srv_diff_host_rate','dst_host_serror_rate','dst_host_srv_serror_rate','dst_host_rerror_rate','dst_host_srv_rerror_rate','label']

Split_rdd = data_rdd.map(lambda line: line.split(","))
data_df = spark.createDataFrame(Split_rdd, features)

data_df.show(2)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_hos

# Task 1: Count the number of records

In [None]:
%%time
print("RDD record count:", data_rdd.count())

RDD record count: 4898431
CPU times: user 70.1 ms, sys: 8.81 ms, total: 78.9 ms
Wall time: 10.1 s


In [None]:
%%time
print("DF record count:", data_df.count())

DF record count: 4898431
CPU times: user 380 ms, sys: 45 ms, total: 425 ms
Wall time: 1min 16s


In [None]:
from pyspark import StorageLevel

In [None]:
%%time
data_df.persist(StorageLevel.MEMORY_AND_DISK)
data_df = data_df.repartition(8)
print("DF record count:", data_df.count())

DF record count: 4898431
CPU times: user 751 ms, sys: 88.3 ms, total: 840 ms
Wall time: 2min 29s


In [None]:
%%time
print("DF record count:", data_df.count())

DF record count: 4898431
CPU times: user 33.9 ms, sys: 2.99 ms, total: 36.8 ms
Wall time: 5.86 s


In [None]:
%%time
data_rdd.persist(StorageLevel.MEMORY_AND_DISK)
print("RDD record count:", data_rdd.count())

RDD record count: 4898431
CPU times: user 112 ms, sys: 15.3 ms, total: 127 ms
Wall time: 18.8 s


# Task 2: Filter the records including the protocol type `tcp`

In [None]:
%%time
filtered_rdd = data_rdd.filter(lambda x: x.split(",")[1]=='tcp')

filtered_count = filtered_rdd.count()

print("number of records with TCP protocols", filtered_count)

number of records with TCP protocols 1870598
CPU times: user 94.7 ms, sys: 15.7 ms, total: 110 ms
Wall time: 16.7 s


In [None]:
%%time
filtered_df = data_df.filter(data_df.protocol_type.contains('tcp'))

filtered_count = filtered_df.count()

print("number of records with TCP protocols", filtered_count)

number of records with TCP protocols 1870598
CPU times: user 13.6 ms, sys: 993 µs, total: 14.6 ms
Wall time: 2.13 s


# Task 3: Find the `distinct types of services` in data

In [None]:
%%time
service_types_rdd = data_rdd.map(lambda line: line.split(",")[2])

distinct_service_rdd = service_types_rdd.distinct().collect()

print("distinct service types:", distinct_service_rdd)

distinct service types: ['http', 'smtp', 'domain_u', 'auth', 'finger', 'telnet', 'eco_i', 'ftp', 'ntp_u', 'ecr_i', 'other', 'urp_i', 'private', 'pop_3', 'ftp_data', 'netstat', 'daytime', 'ssh', 'echo', 'time', 'name', 'whois', 'domain', 'mtp', 'gopher', 'remote_job', 'rje', 'ctf', 'supdup', 'link', 'systat', 'discard', 'X11', 'shell', 'login', 'imap4', 'nntp', 'uucp', 'pm_dump', 'IRC', 'Z39_50', 'netbios_dgm', 'ldap', 'sunrpc', 'courier', 'exec', 'bgp', 'csnet_ns', 'http_443', 'klogin', 'printer', 'netbios_ssn', 'pop_2', 'nnsp', 'efs', 'hostnames', 'uucp_path', 'sql_net', 'vmnet', 'iso_tsap', 'netbios_ns', 'kshell', 'urh_i', 'http_2784', 'harvest', 'aol', 'tftp_u', 'http_8001', 'tim_i', 'red_i']
CPU times: user 124 ms, sys: 11 ms, total: 135 ms
Wall time: 21.5 s


In [None]:
%%time
distinct_service_df = data_df.select('service').distinct().show(truncate = False, n= data_df.count())
print("distinct service types:", distinct_service_df)

+-----------+
|service    |
+-----------+
|telnet     |
|ftp        |
|auth       |
|iso_tsap   |
|systat     |
|name       |
|sql_net    |
|ntp_u      |
|X11        |
|pop_3      |
|ldap       |
|discard    |
|tftp_u     |
|Z39_50     |
|daytime    |
|domain_u   |
|login      |
|smtp       |
|http_2784  |
|mtp        |
|domain     |
|http       |
|harvest    |
|link       |
|courier    |
|kshell     |
|pop_2      |
|other      |
|exec       |
|nnsp       |
|efs        |
|IRC        |
|pm_dump    |
|private    |
|urh_i      |
|ftp_data   |
|whois      |
|nntp       |
|netbios_ns |
|klogin     |
|shell      |
|red_i      |
|tim_i      |
|uucp_path  |
|eco_i      |
|vmnet      |
|ctf        |
|supdup     |
|finger     |
|printer    |
|netbios_dgm|
|urp_i      |
|ecr_i      |
|time       |
|netbios_ssn|
|csnet_ns   |
|hostnames  |
|sunrpc     |
|echo       |
|http_443   |
|netstat    |
|remote_job |
|imap4      |
|gopher     |
|uucp       |
|ssh        |
|rje        |
|bgp        |
|aol  

# Task 4: Calculate the `average duration of connections`

In [None]:
%%time
duration_rdd = data_rdd.map(lambda line: int(line.split(',')[0]))
average_duration = duration_rdd.mean()

print("avg duration of connections:", average_duration)

avg duration of connections: 48.34243046396006
CPU times: user 260 ms, sys: 31.3 ms, total: 292 ms
Wall time: 48.8 s


In [None]:
%%time
from pyspark.sql.functions import avg
average_duration = data_df.groupBy().agg(avg("duration").alias("average_duration"))
average_duration.show()

+-----------------+
| average_duration|
+-----------------+
|48.34243046395876|
+-----------------+

CPU times: user 47.9 ms, sys: 4.48 ms, total: 52.4 ms
Wall time: 8.07 s


# Task 5: Find the maximum number of `source bytes` in a connection

In [None]:
%%time
src_bytes_rdd = data_rdd.map(lambda line: int(line.split(",")[4]))
max_src_bytes = src_bytes_rdd.max()
print("Max source bytes in a connection:", max_src_bytes)

Max source bytes in a connection: 1379963888
CPU times: user 101 ms, sys: 12.2 ms, total: 113 ms
Wall time: 18.2 s


In [None]:
%%time
from pyspark.sql.functions import max, col

max_src_bytes = data_df.withColumn("src_bytes", col("src_bytes").cast("int")).agg(max("src_bytes").alias("max_src_bytes"))

print("Max. source bytes in a connection", max_src_bytes.show())

+-------------+
|max_src_bytes|
+-------------+
|   1379963888|
+-------------+

Max. source bytes in a connection None
CPU times: user 35.1 ms, sys: 4.69 ms, total: 39.8 ms
Wall time: 5.76 s


# Task 6: Calculate the total number of connections that are labeled as `"normal"`

In [None]:
%%time
normal_connection_rdd = data_rdd.filter(lambda line: line.split(",")[-1] == 'normal.')

normal_connections_count = normal_connection_rdd.count()

print("total number of normal connections:", normal_connections_count)

total number of normal connections: 972781
CPU times: user 93.6 ms, sys: 10 ms, total: 104 ms
Wall time: 16.2 s


In [None]:
%%time
total_normal_connections = data_df.filter(col("label") == "normal.").count()
print("Total no. of normal connection:",total_normal_connections)

Total no. of normal connection: 972781
CPU times: user 8.6 ms, sys: 2.13 ms, total: 10.7 ms
Wall time: 1.28 s


# Task 7: Group the records by `type` (normal or sub-attacks) and `count the number of connections` for each type

In [None]:
%%time
import pandas as pd
attack_rdd = data_rdd.map(lambda line: (line.split(',')[-1],1))
connection_per_attack_rdd = attack_rdd.reduceByKey(lambda x,y: x + y)

connection_per_attack_list = connection_per_attack_rdd.collect()


df = pd.DataFrame(connection_per_attack_list, columns = ['service','count'])
df.sort_values('count',ascending= False)

CPU times: user 129 ms, sys: 22.6 ms, total: 151 ms
Wall time: 21.7 s


Unnamed: 0,service,count
5,smurf.,2807886
4,neptune.,1072017
0,normal.,972781
15,satan.,15892
10,ipsweep.,12481
9,portsweep.,10413
17,nmap.,2316
13,back.,2203
20,warezclient.,1020
8,teardrop.,979


In [None]:
%%time
connections_by_protocol_type_sorted = data_df.groupBy("label").count().orderBy("count", ascending = False)
connections_by_protocol_type_sorted.show(truncate = False, n = data_df.count())

+----------------+-------+
|label           |count  |
+----------------+-------+
|smurf.          |2807886|
|neptune.        |1072017|
|normal.         |972781 |
|satan.          |15892  |
|ipsweep.        |12481  |
|portsweep.      |10413  |
|nmap.           |2316   |
|back.           |2203   |
|warezclient.    |1020   |
|teardrop.       |979    |
|pod.            |264    |
|guess_passwd.   |53     |
|buffer_overflow.|30     |
|land.           |21     |
|warezmaster.    |20     |
|imap.           |12     |
|rootkit.        |10     |
|loadmodule.     |9      |
|ftp_write.      |8      |
|multihop.       |7      |
|phf.            |4      |
|perl.           |3      |
|spy.            |2      |
+----------------+-------+

CPU times: user 60.6 ms, sys: 13.7 ms, total: 74.3 ms
Wall time: 11.1 s


# Task 8: Calculate the `number of connections` for `each attack type and protocol`

In [None]:
%%time
label_protocol_count_rdd = data_rdd.map(lambda line: ((line.split(',')[-1],line.split(',')[1]), 1))

connections_by_attack_and_protocol_rdd = label_protocol_count_rdd.reduceByKey(lambda x,y: x + y)

connections_by_attack_and_protocol_rdd.collect()

CPU times: user 155 ms, sys: 13.3 ms, total: 168 ms
Wall time: 27.8 s


[(('normal.', 'tcp'), 768670),
 (('normal.', 'udp'), 191348),
 (('normal.', 'icmp'), 12763),
 (('buffer_overflow.', 'tcp'), 30),
 (('loadmodule.', 'tcp'), 9),
 (('perl.', 'tcp'), 3),
 (('neptune.', 'tcp'), 1072017),
 (('smurf.', 'icmp'), 2807886),
 (('guess_passwd.', 'tcp'), 53),
 (('pod.', 'icmp'), 264),
 (('teardrop.', 'udp'), 979),
 (('portsweep.', 'tcp'), 10407),
 (('ipsweep.', 'tcp'), 924),
 (('land.', 'tcp'), 21),
 (('ftp_write.', 'tcp'), 8),
 (('back.', 'tcp'), 2203),
 (('imap.', 'tcp'), 12),
 (('satan.', 'icmp'), 37),
 (('satan.', 'udp'), 1708),
 (('satan.', 'tcp'), 14147),
 (('phf.', 'tcp'), 4),
 (('ipsweep.', 'icmp'), 11557),
 (('nmap.', 'icmp'), 1032),
 (('nmap.', 'tcp'), 1034),
 (('multihop.', 'tcp'), 7),
 (('nmap.', 'udp'), 250),
 (('warezmaster.', 'tcp'), 20),
 (('warezclient.', 'tcp'), 1020),
 (('spy.', 'tcp'), 2),
 (('rootkit.', 'tcp'), 7),
 (('portsweep.', 'icmp'), 6),
 (('rootkit.', 'udp'), 3)]

In [None]:
%%time
connections_by_attack_and_protocol = data_df.groupby("label","protocol_type").count().withColumnRenamed("count","Count")
connections_by_attack_and_protocol.orderBy("label","protocol_type").show(truncate = False)

+----------------+-------------+-------+
|label           |protocol_type|Count  |
+----------------+-------------+-------+
|back.           |tcp          |2203   |
|buffer_overflow.|tcp          |30     |
|ftp_write.      |tcp          |8      |
|guess_passwd.   |tcp          |53     |
|imap.           |tcp          |12     |
|ipsweep.        |icmp         |11557  |
|ipsweep.        |tcp          |924    |
|land.           |tcp          |21     |
|loadmodule.     |tcp          |9      |
|multihop.       |tcp          |7      |
|neptune.        |tcp          |1072017|
|nmap.           |icmp         |1032   |
|nmap.           |tcp          |1034   |
|nmap.           |udp          |250    |
|normal.         |icmp         |12763  |
|normal.         |tcp          |768670 |
|normal.         |udp          |191348 |
|perl.           |tcp          |3      |
|phf.            |tcp          |4      |
|pod.            |icmp         |264    |
+----------------+-------------+-------+
only showing top

# Wrap up


---


While DataFrames generally offer better performance optimizations and a more optimized query execution engine compared to RDDs, there are certain scenarios where RDDs might be faster than DataFrames:

- `Complex Transformations:` If you need to perform complex and custom transformations that are not easily expressible using DataFrame operations or require fine-grained control over the data processing logic, RDDs can be more efficient. RDDs provide lower-level APIs and allow you to use custom functions, which can be beneficial for certain use cases.

- `Iterative Algorithms:` RDDs are a better choice for iterative algorithms, such as machine learning algorithms, where data needs to be repeatedly processed in multiple iterations. RDDs provide efficient support for iterative computations by allowing you to persist intermediate data in memory between iterations, whereas DataFrame operations may incur additional overhead due to their immutability.

- `Data with Irregular Structure`: If your data has irregular or nested structures that are challenging to represent in a tabular format, RDDs can be more suitable. RDDs allow you to manipulate data in a more flexible and unstructured manner, making them a better choice for handling complex data types or hierarchical structures.

- `Fine-Grained Control`: RDDs provide fine-grained control over data partitioning, caching, and serialization, allowing you to optimize performance based on specific requirements. If you require explicit control over these aspects, RDDs can offer better performance tuning options compared to DataFrames.