### Do necessary imports

In [1]:
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import substring, expr, explode, count, col, when
import pandas as pd

# Temporary imports to load avro files
from os import listdir
from os.path import isfile, join

# Start spark session
sqlc = SparkSession \
    .builder \
    .appName("Master Thesis Analysis") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.0") \
    .getOrCreate()

# Constants
FILE_PATH = "openintel/"

### Load AVRO files DNS RR

In [3]:
# TODO: connect to spark cluster, now manually load 1 avro folder for testing
my_dnsrr_df = sqlc.read.format("avro").load([join(FILE_PATH, f) for f in listdir(FILE_PATH) if isfile(join(FILE_PATH, f))])

# Load the known domain names
with open('CT-logs domeinen.txt', 'r') as f:
    known_names = [line.strip() for line in f]

with open('CT-logs domeinen.txt', 'r') as f:
    known_names_with_dot = [line.strip() + '.' for line in f]

# Transform known domain names to dataframes
my_known_names_df = sqlc.createDataFrame(pd.DataFrame({"domain": known_names }))
my_known_names_with_dot_df = sqlc.createDataFrame(pd.DataFrame({"query_name": known_names_with_dot }))

# Load the Resource Records into dataframes
# with open('CAA records.txt', 'r') as f:
#    caa_known = [line.split()[-1] for line in f]
#    caa_known_df = sqlc.createDataFrame(pd.DataFrame({"caa_value": caa_known }))

with open('A IPv4 records.txt', 'r') as f:
    ipv4_known = [line.strip() for line in f]
    ipv4_known_df = sqlc.createDataFrame(pd.DataFrame({"ip4_address": ipv4_known }))
    
with open('MX records.txt', 'r') as f:
    mx_known = [line.split()[-1] for line in f]
    mx_known_df = sqlc.createDataFrame(pd.DataFrame({"mx_address": mx_known }))
    
with open('NS records.txt', 'r') as f:
    ns_known = [line.strip() for line in f]    
    ns_known_df = sqlc.createDataFrame(pd.DataFrame({"ns_address": ns_known }))

### OpenINTEL analysis

In [3]:
# Select the data needed for every comparison
# caa_data = my_dnsrr_df.where(my_dnsrr_df.response_type == "CAA").select('query_name', 'caa_value')
ipv4_data = my_dnsrr_df.where(my_dnsrr_df.response_type == "A").select('query_name', 'ip4_address')
mx_data = my_dnsrr_df.where(my_dnsrr_df.response_type == "MX").select('query_name', 'mx_address')
ns_data = my_dnsrr_df.where(my_dnsrr_df.response_type == "NS").select('query_name', 'ns_address')

# Obtain results per type
# First join on recognized value. Select only query name and only distinct values
# Then anti join with known names to find new domains
# Finally remove the trailing dot, since we need PQDNs for CT analysis

# CAA code: not used because of the large number of false positives (many websites have pkioverheid.nl as CAA record)
# caa_result = caa_data.join(caa_known_df, caa_data.caa_value == caa_known_df.caa_value, 'inner').select('query_name').distinct()
# caa_result = caa_result.join(my_known_names_with_dot_df, 'query_name', how='left_anti')
# caa_result = caa_result.withColumn("domain", expr("substring(query_name, 1, length(query_name)-1)")).select("domain")
# caa_result.show(truncate=False)

ipv4_result = ipv4_data.join(ipv4_known_df, ipv4_data.ip4_address == ipv4_known_df.ip4_address, 'inner').select('query_name').distinct()
ipv4_result = ipv4_result.join(my_known_names_with_dot_df, 'query_name', how='left_anti')
ipv4_result = ipv4_result.withColumn("domain", expr("substring(query_name, 1, length(query_name)-1)")).select("domain")
# ipv4_result.show(truncate=False)

mx_result = mx_data.join(mx_known_df, mx_data.mx_address == mx_known_df.mx_address, 'inner').select('query_name').distinct()
mx_result = mx_result.join(my_known_names_with_dot_df, 'query_name', how='left_anti')
mx_result = mx_result.withColumn("domain", expr("substring(query_name, 1, length(query_name)-1)")).select("domain")
# mx_result.show(truncate=False)

ns_result = ns_data.join(ns_known_df, ns_data.ns_address == ns_known_df.ns_address, 'inner').select('query_name').distinct()
ns_result = ns_result.join(my_known_names_with_dot_df, 'query_name', how='left_anti')
ns_result = ns_result.withColumn("domain", expr("substring(query_name, 1, length(query_name)-1)")).select("domain")
# ns_result.show(truncate=False)

In [4]:
# Write intermediate results to CSV
ipv4_result.toPandas().to_csv("ipv4-new.csv", index=False)
mx_result.toPandas().to_csv("mx-new.csv", index=False)
ns_result.toPandas().to_csv("ns-new.csv", index=False)

# Join results with known domains.
# Currently excluding CAA, because it appeared to be used by non-government domains as well.
my_known_names_df = my_known_names_df.union(ipv4_result)
my_known_names_df = my_known_names_df.union(mx_result)
my_known_names_df = my_known_names_df.union(ns_result)

### Load AVRO files CT

In [4]:
# TODO: connect to spark cluster, now manually load 1 avro file for testing
my_ct_df = sqlc.read.format("avro").load("Google-Rocketeer.1117550000-1117574999.avro")

### CT analysis

In [5]:
# Filter the CT dataset on the known domain names
# result = my_ct_df.join(my_known_names_df, my_ct_df.leaf_cert.subject.CN == my_known_names_df.domain, "inner")
my_ct_exp_df = my_ct_df.select(
 "cert_index",
  explode("leaf_cert.all_domains").alias("domain"),
 "leaf_cert.all_domains"
)

In [6]:
my_ct_exp_df.show(5, truncate=False)
my_known_names_df.show(5, truncate=False)

+----------+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------+
|cert_index|domain                                                    |all_domains                                                                                                    |
+----------+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------+
|1117550000|ffc0ef16e65348ff9e06181f0751a1b1.cc.arc.azure.net         |[ffc0ef16e65348ff9e06181f0751a1b1.cc.arc.azure.net, 5618abcf0e5d4d83b2f155f6de02a3e7.int.scus.cc.arc.azure.net]|
|1117550000|5618abcf0e5d4d83b2f155f6de02a3e7.int.scus.cc.arc.azure.net|[ffc0ef16e65348ff9e06181f0751a1b1.cc.arc.azure.net, 5618abcf0e5d4d83b2f155f6de02a3e7.int.scus.cc.arc.azure.net]|
|1117550001|73f6aee562ae4150bf1fd77019a30010.cc.arc.azure.net         |[73f6aee5

In [7]:
my_ct_exp_match_df = my_ct_exp_df.join(
 my_known_names_df,
 ['domain'],
 how="inner"
)

my_ct_exp_match_df.show(5, truncate=False)

+----------------------------------------------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|domain                                                          |cert_index|all_domains                                                                                                                                                                                                                                                                            |
+----------------------------------------------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
my_final_ct_set = my_ct_exp_match_df.select(
 explode("all_domains").alias("domain")
).distinct().join(
 my_known_names_df,
 ["domain"],
 "left_anti"
)

my_final_ct_set.show(truncate=False)

my_final_ct_set.toPandas().to_csv("ct-results.csv", index=False)

+-----------------------------------------------------------------------------+
|domain                                                                       |
+-----------------------------------------------------------------------------+
|synt-20220503-0048-scaleout-68lh-sg-c.postgres.database.azure.com            |
|*.synt-20220503-0048-scaleout-68lh-sg.postgres.database.azure.com            |
|*.privatelink.synt-20220503-0048-scaleout-68lh-sg.postgres.database.azure.com|
+-----------------------------------------------------------------------------+



In [9]:
total_new = my_final_ct_set.count()
total_wildcard = my_final_ct_set.filter(my_final_ct_set.domain.startswith("*.")).count()

print(f"Total number of new domains in CT: {total_new}")
print(f"Total number of wilcard domains in CT: {total_wildcard}")

Total number of new domains in CT: 3
Total number of wilcard domains in CT: 2


In [None]:
# End