<a href="https://colab.research.google.com/github/FibGro/Pyspark/blob/main/Structure_Data_DF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=31d3e0985c861df6f401bd8eb944babaa962a615968207d965562781c9a34146
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

In [6]:
df = spark.read.text('kddcup.data.corrected')
df.printSchema()

root
 |-- value: string (nullable = true)



In [5]:
df.show()

+--------------------+
|               value|
+--------------------+
|0,tcp,http,SF,215...|
|0,tcp,http,SF,162...|
|0,tcp,http,SF,236...|
|0,tcp,http,SF,233...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,238...|
|0,tcp,http,SF,235...|
|0,tcp,http,SF,234...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,181...|
|0,tcp,http,SF,184...|
|0,tcp,http,SF,185...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,181...|
|0,tcp,http,SF,236...|
|0,tcp,http,SF,233...|
|0,tcp,http,SF,238...|
|0,tcp,http,SF,235...|
|0,tcp,http,SF,234...|
|0,tcp,http,SF,239...|
+--------------------+
only showing top 20 rows



In [8]:
# Need to read data split data because of zip format

from pyspark.sql.functions import split


split_col = split(df['value'], ',')


df = df.withColumn('protocol_type', split_col.getItem(1))\
      .withColumn('service', split_col.getItem(2))\
      .withColumn('flag', split_col.getItem(3))\
      .withColumn('src_bytes', split_col.getItem(4))\
      .withColumn('dst_bytes', split_col.getItem(5))\
      .withColumn('urgent', split_col.getItem(8))\
      .withColumn('num_failed_logins', split_col.getItem(10))\
      .withColumn('root_shell', split_col.getItem(13))\
      .withColumn('guest_login', split_col.getItem(21))\
      .withColumn('label', split_col.getItem(41))\
      .drop('value')

df.show()

+-------------+-------+----+---------+---------+------+-----------------+----------+-----------+-------+
|protocol_type|service|flag|src_bytes|dst_bytes|urgent|num_failed_logins|root_shell|guest_login|  label|
+-------------+-------+----+---------+---------+------+-----------------+----------+-----------+-------+
|          tcp|   http|  SF|      215|    45076|     0|                0|         0|          0|normal.|
|          tcp|   http|  SF|      162|     4528|     0|                0|         0|          0|normal.|
|          tcp|   http|  SF|      236|     1228|     0|                0|         0|          0|normal.|
|          tcp|   http|  SF|      233|     2032|     0|                0|         0|          0|normal.|
|          tcp|   http|  SF|      239|      486|     0|                0|         0|          0|normal.|
|          tcp|   http|  SF|      238|     1282|     0|                0|         0|          0|normal.|
|          tcp|   http|  SF|      235|     1337|     0|

In [10]:
df = df.repartition(10) # shuffle all data
print(df.rdd.getNumPartitions())

10


In [13]:
# Count the number of connections for each label

df.groupby('label').count().orderBy('count', ascending = True).show()

+----------------+------+
|           label| count|
+----------------+------+
|           land.|     1|
|     loadmodule.|     1|
|            phf.|     1|
|           imap.|     1|
|           perl.|     1|
|buffer_overflow.|     3|
|       multihop.|     5|
|      ftp_write.|     8|
|            pod.|    20|
|          satan.|    24|
|   guess_passwd.|    53|
|       teardrop.|    99|
|      portsweep.|   405|
|           nmap.|  1296|
|           back.|  2002|
|        ipsweep.|  6556|
|        neptune.| 76585|
|          smurf.|112574|
|         normal.|393014|
+----------------+------+



In [19]:
# Get the list of Protocol that are normal and vulnerable to attact, where there is NOT guest login to destiantion address

df.createOrReplaceTempView('archive')

sql_query = """ SELECT protocol_type,
                CASE label WHEN 'normal.' THEN 'no attack'
                ELSE 'attack' END AS attact_type,
                COUNT(*) AS freq
                FROM  archive
                WHERE guest_login != '1'
                GROUP BY protocol_type, attact_type """

spark.sql(sql_query).show()

+-------------+-----------+------+
|protocol_type|attact_type|  freq|
+-------------+-----------+------+
|          tcp|  no attack|375093|
|          udp|     attack|   355|
|          tcp|     attack| 81034|
|         icmp|     attack|118241|
|         icmp|  no attack|  2235|
|          udp|  no attack| 14177|
+-------------+-----------+------+



In [27]:
# Apply some descriptive statistics on NUmerical Data

from pyspark.sql.functions import *

summary = df.select(mean(df.src_bytes).alias('AVG_src_bytes'))  # Changed 'scr_bytes' to 'src_bytes'

summary.show()

+------------------+
|     AVG_src_bytes|
+------------------+
|2245.6344176738676|
+------------------+



In [29]:
group = df.groupBy('protocol_type')

group.agg({'src_bytes': 'mean', 'dst_bytes' : 'stddev'}).show()

+-------------+------------------+-----------------+
|protocol_type|    avg(src_bytes)|stddev(dst_bytes)|
+-------------+------------------+-----------------+
|          tcp| 2652.458254395913|33287.69361825222|
|          udp|46.651665290393616|50.58200520807657|
|         icmp| 965.5151399448853|              0.0|
+-------------+------------------+-----------------+



In [38]:
# Descriptive Stasts based on Protocols and Labels
# Descriptive Stasts based on Protocols and Labels

sql_query = """ SELECT protocol_type,
                CASE label WHEN 'normal.' THEN 'no attack'
                ELSE 'attack' END as attack_type,
                AVG(src_bytes) AS avg_src_bytes,
                stddev(src_bytes) AS stddev_src_bytes,
                avg(dst_bytes) AS avg_dst_bytes,
                stddev(dst_bytes) AS stddev_dst_bytes
                FROM archive
                GROUP BY protocol_type, attack_type """ # Group by the correct column name

spark.sql(sql_query).show()

+-------------+-----------+------------------+------------------+------------------+------------------+
|protocol_type|attack_type|     avg_src_bytes|  stddev_src_bytes|     avg_dst_bytes|  stddev_dst_bytes|
+-------------+-----------+------------------+------------------+------------------+------------------+
|          tcp|  no attack|1093.5004620262239|40092.982869110296| 4390.664598169951| 36604.62079009492|
|          udp|     attack|144.01971830985914| 83.50876779928832|2.1154929577464787|26.489325469139047|
|          tcp|     attack| 9897.199953108997|2435697.2571804025|223.60135243524724|4054.7871890567603|
|         icmp|     attack| 983.1714887391006| 218.4669584616153|               0.0|               0.0|
|         icmp|  no attack| 31.41923937360179|10.461255043044313|               0.0|               0.0|
|          udp|  no attack| 44.21351484799323|15.000072165352945| 72.09360231360655| 49.85432559280989|
+-------------+-----------+------------------+------------------

In [40]:
# Get the frequency of services for the original UDP and ICMP based attacks


def attack_category(item):
  if item.replace(".","") in ['back', 'land', 'netpune', 'smurf', 'teardrop']:
    return "DoS"
  elif item.replace(".","") in ['buffer_overflow', 'loadmodule', 'perl', 'rootkit']:
    return "U2R"
  elif item.replace(".","") in ['ftp_write', 'guess_password', 'multihop', 'spy', 'warezclient', 'warezmaster']:
    return "R2L"
  else:
    return 'probe'

def center_justify(item):
  return item.center(10)

spark.udf.register('OriginalAttack', attack_category, StringType())
spark.udf.register('center_justify', center_justify, StringType())




In [43]:
sql_query = """ SELECT
                center_justify(service) AS service,
                OriginalAttack(label) AS attack_type,
                center_justify(protocol_type) AS protocol_type,
                COUNT(*) AS freq
                FROM archive
                WHERE protocol_type IN ('udp', 'icmp') AND label == 'normal.'
                GROUP BY service, attack_type , protocol_type"""

spark.sql(sql_query).show()

+----------+-----------+-------------+-----+
|   service|attack_type|protocol_type| freq|
+----------+-----------+-------------+-----+
|  other   |      probe|      udp    |   17|
|  ecr_i   |      probe|      icmp   | 1100|
|  ntp_u   |      probe|      udp    | 1651|
|  urp_i   |      probe|      icmp   |   30|
|  eco_i   |      probe|      icmp   | 1105|
| domain_u |      probe|      udp    |12502|
| private  |      probe|      udp    |    7|
+----------+-----------+-------------+-----+

