## 1. Data ETL

### 1.1 Setup pySpark on Google Colab

In [3]:
# Spark needs JVM - download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark 3.1.2 with Hadoop 3.2
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz # extract the gzipped tarball

# Install findspark - provides findspark.init() to make pyspark importable as a regular library
!pip install -q findspark

# Set the environment path
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

# Make pyspark importable
import findspark 
findspark.init()
findspark.find()  # where is Spark ?

'/content/spark-3.1.2-bin-hadoop3.2'

### 1.2 Data Loading
Load Data from [UCI Machine Learning repository](https://archive.ics.uci.edu/ml/datasets/Internet+Firewall+Data)

In [1]:
!wget http://archive.ics.uci.edu/ml/machine-learning-databases/00542/log2.csv

--2021-06-25 03:42:21--  http://archive.ics.uci.edu/ml/machine-learning-databases/00542/log2.csv
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2876998 (2.7M) [application/x-httpd-php]
Saving to: ‘log2.csv’


2021-06-25 03:42:22 (2.58 MB/s) - ‘log2.csv’ saved [2876998/2876998]

log2.csv  sample_data


In [10]:
data_file = 'log2.csv'

!head $data_file

Source Port,Destination Port,NAT Source Port,NAT Destination Port,Action,Bytes,Bytes Sent,Bytes Received,Packets,Elapsed Time (sec),pkts_sent,pkts_received
57222,53,54587,53,allow,177,94,83,2,30,1,1
56258,3389,56258,3389,allow,4768,1600,3168,19,17,10,9
6881,50321,43265,50321,allow,238,118,120,2,1199,1,1
50553,3389,50553,3389,allow,3327,1438,1889,15,17,8,7
50002,443,45848,443,allow,25358,6778,18580,31,16,13,18
51465,443,39975,443,allow,3961,1595,2366,21,16,12,9
60513,47094,45469,47094,allow,320,140,180,6,7,3,3
50049,443,21285,443,allow,7912,3269,4643,23,96,12,11
52244,58774,2211,58774,allow,70,70,0,1,5,1,0


In [4]:
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.appName('firewall').getOrCreate()

In [5]:
# Source file has headers and no data cleaning required 
df = spark.read.csv(data_file, header='true', inferSchema='true')

In [6]:
df.printSchema()

root
 |-- Source Port: integer (nullable = true)
 |-- Destination Port: integer (nullable = true)
 |-- NAT Source Port: integer (nullable = true)
 |-- NAT Destination Port: integer (nullable = true)
 |-- Action: string (nullable = true)
 |-- Bytes: integer (nullable = true)
 |-- Bytes Sent: integer (nullable = true)
 |-- Bytes Received: integer (nullable = true)
 |-- Packets: integer (nullable = true)
 |-- Elapsed Time (sec): integer (nullable = true)
 |-- pkts_sent: integer (nullable = true)
 |-- pkts_received: integer (nullable = true)



In [7]:
df.show(10)

+-----------+----------------+---------------+--------------------+------+-----+----------+--------------+-------+------------------+---------+-------------+
|Source Port|Destination Port|NAT Source Port|NAT Destination Port|Action|Bytes|Bytes Sent|Bytes Received|Packets|Elapsed Time (sec)|pkts_sent|pkts_received|
+-----------+----------------+---------------+--------------------+------+-----+----------+--------------+-------+------------------+---------+-------------+
|      57222|              53|          54587|                  53| allow|  177|        94|            83|      2|                30|        1|            1|
|      56258|            3389|          56258|                3389| allow| 4768|      1600|          3168|     19|                17|       10|            9|
|       6881|           50321|          43265|               50321| allow|  238|       118|           120|      2|              1199|        1|            1|
|      50553|            3389|          50553|      

## 2. Data Analysis

### 2.1. Analysis of Firewall Action

In [46]:
df.select('Action').distinct().show()

+----------+
|    Action|
+----------+
|     allow|
|      deny|
|      drop|
|reset-both|
+----------+



In [47]:
df.groupBy('Action').count().orderBy('count', ascending=False).show()

+----------+-----+
|    Action|count|
+----------+-----+
|     allow|37640|
|      deny|14987|
|      drop|12851|
|reset-both|   54|
+----------+-----+



In [48]:
from pyspark.sql import functions as f

df.groupBy('Action').agg(f.sum('Bytes').alias('total_bytes')).show()

+----------+-----------+
|    Action|total_bytes|
+----------+-----------+
|     allow| 6362588560|
|      deny|    1247094|
|      drop|     882546|
|reset-both|       8497|
+----------+-----------+



In [45]:
df.createOrReplaceTempView('FW')

spark.sql("""
      SELECT Action, 
        ROUND(sum(Bytes)/(1024*1024), 2) AS total_Mbytes,
        ROUND(sum(`Bytes Sent`)/(1024*1024), 2) AS Sent_Mbytes,
        ROUND(sum(`Bytes Received`)/(1024*1024), 2) AS Recv_Mbytes
      FROM FW 
      GROUP BY Action
      """).show()

+----------+------------+-----------+-----------+
|    Action|total_Mbytes|Sent_Mbytes|Recv_Mbytes|
+----------+------------+-----------+-----------+
|     allow|     6067.84|    1396.99|    4670.85|
|      deny|        1.19|       1.19|        0.0|
|      drop|        0.84|       0.84|        0.0|
|reset-both|        0.01|       0.01|        0.0|
+----------+------------+-----------+-----------+



In [49]:
# Which Destination Ports were blocked most
spark.sql("""
      SELECT Action, `Destination Port`, COUNT(*) AS Count
      FROM FW 
      WHERE Action = 'deny' 
      GROUP BY Action, `Destination Port`
      ORDER BY Count DESC
      LIMIT 10
      """).show()

+------+----------------+-----+
|Action|Destination Port|Count|
+------+----------------+-----+
|  deny|           25174| 1087|
|  deny|            5900|  854|
|  deny|           37965|  826|
|  deny|           22114|  767|
|  deny|           64147|  756|
|  deny|           50584|  717|
|  deny|           57470|  671|
|  deny|           44847|  626|
|  deny|           35440|  428|
|  deny|           51505|  375|
+------+----------------+-----+



In [35]:
# Which Well Known Ports were blocked by firewall 
spark.sql("""
      SELECT Action, `Destination Port`, count(*) AS Count
      FROM FW 
      WHERE Action = 'deny' 
      AND `Destination Port` < 1024
      GROUP BY Action, `Destination Port`
      ORDER BY Count DESC, `Destination Port`
      LIMIT 20
      """).show()

+------+----------------+-----+
|Action|Destination Port|Count|
+------+----------------+-----+
|  deny|              23|  346|
|  deny|               0|   88|
|  deny|              22|   81|
|  deny|               1|   65|
|  deny|             445|   40|
|  deny|              53|   29|
|  deny|              67|   22|
|  deny|              17|   19|
|  deny|              21|   18|
|  deny|              81|   13|
|  deny|             123|   13|
|  deny|              25|   12|
|  deny|              37|   12|
|  deny|              80|    7|
|  deny|             161|    7|
|  deny|             443|    7|
|  deny|             138|    6|
|  deny|             990|    6|
|  deny|             995|    6|
|  deny|             502|    5|
+------+----------------+-----+



In [40]:
# Breakdown of Destination Port Types blocked by firewall
spark.sql("""
      SELECT Action, 
      CASE 
          WHEN (`Destination Port` < 1024) THEN 'Well-Known'
          WHEN (`Destination Port` < 49152) THEN 'Registered'
          ELSE 'Dynamic/Private'
      END AS Dest_Port_Type,
      COUNT(*) AS Count
      FROM FW 
      WHERE Action = 'deny' 
      GROUP BY Action, Dest_Port_Type
      ORDER BY Count DESC
      """).show()

+------+---------------+-----+
|Action| Dest_Port_Type|Count|
+------+---------------+-----+
|  deny|     Registered| 9631|
|  deny|Dynamic/Private| 4535|
|  deny|     Well-Known|  821|
+------+---------------+-----+



In [44]:
# Breakdown of Dest Port Types by firewall Action
spark.sql("""
      SELECT Action, 
      CASE 
          WHEN (`Destination Port` < 1024) THEN 'Well-Known'
          WHEN (`Destination Port` < 49152) THEN 'Registered'
          ELSE 'Dynamic/Private'
      END AS Dest_Port_Type,
      COUNT(*) AS Count
      FROM FW 
      GROUP BY Action, Dest_Port_Type
      ORDER BY Action, Dest_Port_Type DESC
      """).show()

+----------+---------------+-----+
|    Action| Dest_Port_Type|Count|
+----------+---------------+-----+
|     allow|     Well-Known|31522|
|     allow|     Registered| 5103|
|     allow|Dynamic/Private| 1015|
|      deny|     Well-Known|  821|
|      deny|     Registered| 9631|
|      deny|Dynamic/Private| 4535|
|      drop|     Well-Known|12851|
|reset-both|     Well-Known|    1|
|reset-both|     Registered|   46|
|reset-both|Dynamic/Private|    7|
+----------+---------------+-----+



In [25]:
spark.sql("""
      SELECT Action, sum(pkts_received) AS total_pkts_rx
      FROM FW 
      WHERE Action = 'deny' 
      GROUP BY Action
      LIMIT 10
      """).show()

+------+-------------+
|Action|total_pkts_rx|
+------+-------------+
|  deny|           16|
+------+-------------+



In [26]:
spark.sql("""
      SELECT Action, pkts_received
      FROM FW 
      WHERE Action = 'deny' 
      AND pkts_received > 0
      LIMIT 20
      """).show()

+------+-------------+
|Action|pkts_received|
+------+-------------+
|  deny|            2|
|  deny|            2|
|  deny|            2|
|  deny|            2|
|  deny|            2|
|  deny|            2|
|  deny|            2|
|  deny|            2|
+------+-------------+



In [50]:
spark.sql("""
      SELECT *
      FROM FW 
      WHERE Action = 'deny' 
      AND pkts_received > 0
      LIMIT 20
      """).show()

+-----------+----------------+---------------+--------------------+------+-----+----------+--------------+-------+------------------+---------+-------------+
|Source Port|Destination Port|NAT Source Port|NAT Destination Port|Action|Bytes|Bytes Sent|Bytes Received|Packets|Elapsed Time (sec)|pkts_sent|pkts_received|
+-----------+----------------+---------------+--------------------+------+-----+----------+--------------+-------+------------------+---------+-------------+
|      53193|              22|          53193|                  22|  deny|  295|       126|           169|      4|                 0|        2|            2|
|      49630|              22|          49630|                  22|  deny|  295|       126|           169|      4|                 1|        2|            2|
|      59373|              22|          59373|                  22|  deny|  295|       126|           169|      4|                 1|        2|            2|
|      62347|              25|          62347|      

The firewall blocked traffic to SSH (22) and HTTPS (443) ports in the above query