In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when,to_timestamp
from pyspark.sql.types import StructType, StructField, StringType



In [3]:
spark = SparkSession.builder \
    .appName("MACCDCDataAnalysis") \
    .getOrCreate()

base_path = "Data/security-datasets01-main/maccdc-2012"
folders = ["00", "01", "02", "03", "04", "05"]
http_files = [f"{base_path}/{folder}/http.log.gz" for folder in folders]
dns_files = [f"{base_path}/{folder}/dns.log.gz" for folder in folders]

http_df = spark.read.option("header", "true").json(http_files)
dns_df = spark.read.option("header", "true").json(dns_files)

# Convert the 'timestamp' column to a timestamp type
http_df = http_df.withColumn("ts", to_timestamp(col("ts")))
dns_df = dns_df.withColumn("ts", to_timestamp(col("ts")))

# Name http_log and dns_log
http_df.createOrReplaceTempView("http_log")
dns_df.createOrReplaceTempView("dns_log")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/13 09:33:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/13 09:33:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/13 09:33:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
http_df.show(5)

+---------------+---------------+---------+---------------+---------+------+--------------+--------------------+---------------+------+-------+--------------------+----------------+--------------+--------------------+---------------+-----------------+-----------+-----------------+----+-----------+--------------------+------------------+--------------------+--------------------+--------+-------+
|           host|      id.orig_h|id.orig_p|      id.resp_h|id.resp_p|method|orig_filenames|          orig_fuids|orig_mime_types|origin|proxied|            referrer|request_body_len|resp_filenames|          resp_fuids|resp_mime_types|response_body_len|status_code|       status_msg|tags|trans_depth|                  ts|               uid|                 uri|          user_agent|username|version|
+---------------+---------------+---------+---------------+---------+------+--------------+--------------------+---------------+------+-------+--------------------+----------------+--------------+--------

In [5]:
dns_df.show(5)

+-----+-----+----+-----+----+---+-------+--------------+---------+-------------+---------+-----+------+-----------+-----+----------+-------------------+-----+----------+--------+----+--------+--------------------+------------------+
|   AA|   RA|  RD|   TC|TTLs|  Z|answers|     id.orig_h|id.orig_p|    id.resp_h|id.resp_p|proto|qclass|qclass_name|qtype|qtype_name|              query|rcode|rcode_name|rejected| rtt|trans_id|                  ts|               uid|
+-----+-----+----+-----+----+---+-------+--------------+---------+-------------+---------+-----+------+-----------+-----+----------+-------------------+-----+----------+--------+----+--------+--------------------+------------------+
|false|false|true|false|NULL|  0|   NULL|192.168.204.70|    38795|192.168.207.4|       53|  udp|     1| C_INTERNET|   28|      AAAA|creativecommons.org|    3|  NXDOMAIN|   false|NULL|   50106|2012-03-16 15:12:...| C1odva1GOl62Mubql|
|false|false|true|false|NULL|  0|   NULL|192.168.204.70|    53918|19

In [6]:
result_sql = spark.sql("""
SELECT uri, COUNT(*) AS access_count
FROM http_log
WHERE status_code = '200' AND method = 'GET'
GROUP BY uri
ORDER BY access_count DESC
""")
result_sql.createTempView("http_access_count_table")


In [7]:
result_sql.show()



+--------------------+------------+
|                 uri|access_count|
+--------------------+------------+
|                   /|        9475|
|/admin/config.php...|         556|
|  /main.php?logout=1|         194|
|/top.php?stuff=15...|         191|
|            /top.php|         179|
|/main.php?stuff=1...|         172|
|  /get_latest_id.php|         159|
|/admin/config.php...|         138|
|    /cacti/index.php|         129|
|/en-US/api/messag...|         118|
|          /index.php|         105|
|/phpmyadmin/index...|          77|
|             /cacti/|          68|
|        /phpmyadmin/|          56|
|        /favicon.ico|          55|
|              /admin|          42|
|  /scripts/index.php|          40|
|             /icons/|          39|
|/en-US/api/search...|          39|
|  /cgi-bin/index.php|          37|
+--------------------+------------+
only showing top 20 rows



                                                                                

In [8]:
result_join = spark.sql("""
SELECT 
  *
FROM http_log AS h
JOIN dns_log AS d
  ON h.uid = d.uid
""")
result_join.show()


                                                                                

+----+---------+---------+---------+---------+------+--------------+----------+---------------+------+-------+--------+----------------+--------------+----------+---------------+-----------------+-----------+----------+----+-----------+---+---+---+----------+--------+-------+---+---+---+---+----+---+-------+---------+---------+---------+---------+-----+------+-----------+-----+----------+-----+-----+----------+--------+---+--------+---+---+
|host|id.orig_h|id.orig_p|id.resp_h|id.resp_p|method|orig_filenames|orig_fuids|orig_mime_types|origin|proxied|referrer|request_body_len|resp_filenames|resp_fuids|resp_mime_types|response_body_len|status_code|status_msg|tags|trans_depth| ts|uid|uri|user_agent|username|version| AA| RA| RD| TC|TTLs|  Z|answers|id.orig_h|id.orig_p|id.resp_h|id.resp_p|proto|qclass|qclass_name|qtype|qtype_name|query|rcode|rcode_name|rejected|rtt|trans_id| ts|uid|
+----+---------+---------+---------+---------+------+--------------+----------+---------------+------+-------+

In [10]:
tcp_percentage = spark.sql("""
SELECT
    uri, tcp_percentage
FROM (
    SELECT
        uri,
        COUNT(CASE WHEN dns_log.proto = 'TCP' THEN 1 END) * 100.0 / COUNT(*) AS tcp_percentage
    FROM http_log
    JOIN dns_log ON http_log.uid = dns_log.uid
    WHERE http_log.status_code = '200' AND http_log.method = 'GET'
    GROUP BY uri
) AS tcp_counts
WHERE tcp_percentage > 50
""")

tcp_percentage.createTempView("tcp_percentage_table")
tcp_percentage.show()



+---+--------------+
|uri|tcp_percentage|
+---+--------------+
+---+--------------+

