 # Preparation

In [None]:
!pip install ipynb
!pip install dnspython
!pip install geoip2
!pip install psycopg2-binary

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from ipynb.fs.full.Functions import *
from pyspark.sql.types import *

In [None]:
domains_df = SparkSession.builder \
    .appName("etl_domains") \
    .getOrCreate() \
    .read.csv('../data/real_domains.csv', escape = "\"") \
    .toDF("top_level_domain", "mx_record", "a_record", "timestamp") \
    .drop('timestamp')

# Section I

In [None]:
def clean_data(df, column, to_delete, to_replace): return df.withColumn(column, regexp_replace(column, to_delete, to_replace))
for column in domains_df.schema.names: domains_df = clean_data(domains_df, column, '\\[|\\]|\\"', "")

In [None]:
domains_df = domains_df \
                .withColumn('mx_record', when(domains_df['mx_record'] == '', None).otherwise(split(domains_df['mx_record'], ','))) \
                .withColumn('a_record', when(domains_df['a_record'] == '', None).otherwise(split(domains_df['a_record'], ',')))

In [None]:
upsert("domain", ["top_level_domain"], ["mx_record", "a_record"], domains_df)

# Section II

In [None]:
def count_arr(arr): return 0 if arr == None else len(arr)
count_arr_udf = udf(count_arr, IntegerType())

def uses_localhost(mx_records): return mx_records != None and 'localhost' in mx_records
uses_localhost_udf = udf(uses_localhost, BooleanType())

In [None]:
domains_df_enhanced = domains_df \
    .withColumn("a_record_count", count_arr_udf("a_record")) \
    .withColumn("mx_record_count", count_arr_udf("mx_record")) \
    .withColumn("mx_uses_localhost", uses_localhost_udf("mx_record")) \
    .drop("mx_record") \
    .drop("a_record")

a_record_count_top_ten_df = domains_df.withColumn('a_record', explode(col('a_record'))) \
        .groupBy('a_record') \
        .count() \
        .orderBy(['count'], ascending = [False]) \
        .limit(10)

mx_record_count_top_ten_df = domains_df.withColumn('mx_record', explode(col('mx_record'))) \
        .groupBy('mx_record') \
        .count() \
        .orderBy(['count'], ascending = [False]) \
        .limit(10)

In [None]:
upsert("a_record_count_global", ["a_record"], ["count"], a_record_count_top_ten_df)
upsert("mx_record_count_global", ["mx_record"], ["count"], mx_record_count_top_ten_df)
upsert("domain_enhanced_based_on_existing_data", ["top_level_domain"], ["a_record_count", "mx_record_count", "mx_uses_localhost"], domains_df_enhanced)

In [None]:
del a_record_count_top_ten_df
del mx_record_count_top_ten_df
del domains_df_enhanced

# Section III

In [None]:
args = domains_df.select("top_level_domain").toPandas().values.reshape(-1)

## A

In [None]:
result_map_a_record = execute_threaded_fn(get_a_records, args)
def _get_a_records_err(top_level_domain): return result_map_a_record[top_level_domain][0]
def _get_a_records(top_level_domain): return result_map_a_record[top_level_domain][1]
udf_get_a_records_err = udf(_get_a_records_err, IntegerType())
udf_get_a_records = udf(_get_a_records, ArrayType(StringType()))

result_map_mx_record = execute_threaded_fn(get_mx_records, args)
def _get_mx_records_err(top_level_domain): return result_map_mx_record[top_level_domain][0]
def _get_mx_records(top_level_domain): return result_map_mx_record[top_level_domain][1]
udf_get_mx_records_err = udf(_get_mx_records_err, IntegerType())
udf_get_mx_records = udf(_get_mx_records, ArrayType(StringType()))

udf_remove_last_char_in_array = udf(fn_remove_dot, ArrayType(StringType()))

In [None]:
domains_checked_df = domains_df.withColumn("a_record_checked", udf_get_a_records("top_level_domain")) \
                            .withColumn("a_record_checked_error", udf_get_a_records_err("top_level_domain")) \
                            .withColumn("mx_record_checked", udf_get_mx_records("top_level_domain")) \
                            .withColumn("mx_record_checked", udf_remove_last_char_in_array(col("mx_record_checked"))) \
                            .withColumn("mx_record_checked_error", udf_get_mx_records_err("top_level_domain")) \
                            .drop("a_record") \
                            .drop("mx_record")

a_record_count_top_ten_df = domains_checked_df.withColumn('a_record_checked', explode(col('a_record_checked'))) \
        .groupBy('a_record_checked') \
        .count() \
        .orderBy(['count'], ascending = [False]).limit(10)

mx_record_count_top_ten_df = domains_checked_df.withColumn('mx_record_checked', explode(col('mx_record_checked'))) \
        .groupBy('mx_record_checked') \
        .count() \
        .orderBy(['count'], ascending = [False]).limit(10)

In [None]:
upsert("a_record_checked_count_global", ["a_record_checked"], ["count"], a_record_count_top_ten_df)
upsert("mx_record_checked_count_global", ["mx_record_checked"], ["count"], mx_record_count_top_ten_df)
upsert("domain_records_checked", ["top_level_domain"], ["a_record_checked", "a_record_checked_error", "mx_record_checked", "mx_record_checked_error"], domains_checked_df)

In [None]:
del result_map_a_record
del result_map_mx_record

del a_record_count_top_ten_df
del mx_record_count_top_ten_df

## B

In [None]:
result_map_redirect = execute_threaded_fn(get_redirect_data, args)
def get_status_code(top_level_domain): return result_map_redirect[top_level_domain][0]
def get_redirect_url(top_level_domain): return result_map_redirect[top_level_domain][1]
udf_get_status_code = udf(get_status_code, IntegerType())
udf_get_redirect_url = udf(get_redirect_url, StringType())

In [None]:
domains_redirect_df = domains_df.withColumn("redirection", udf_get_redirect_url("top_level_domain")) \
                                .withColumn("status_code", udf_get_status_code("top_level_domain")) \
                                .drop("a_record") \
                                .drop("mx_record")

In [None]:
upsert("domain_redirection", ["top_level_domain"], ["redirection", "status_code"], domains_redirect_df)

In [None]:
del result_map_redirect
del domains_redirect_df

## C

In [None]:
schema_location = StructType([
    StructField("iso_code", StringType(), True),
    StructField("city", StringType(), True),
    StructField("postal", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True)
])

schema_asn = StructType([StructField("autonomous_system_organization", StringType(), True)])

udf_get_geolite2_location = udf(getGeoLite2_Location, schema_location)
udf_get_geolite2_asn = udf(getGeoLite2_ASN, schema_asn)

mx_records = domains_checked_df.select("mx_record_checked").distinct()
flat_unique_mx_records = set([item for sublist in mx_records.filter(mx_records["mx_record_checked"].isNotNull()).distinct().toPandas().values.reshape(-1) for item in sublist])

result_map_a_record_for_mx = execute_threaded_fn(get_a_records, flat_unique_mx_records)
def _get_a_records_for_mx(mx): return result_map_a_record_for_mx[mx][1]
udf_get_a_records_for_mx = udf(_get_a_records_for_mx, ArrayType(StringType()))

In [None]:
domains_mx_record_geolite2_df = domains_checked_df.drop("a_record_checked") \
                                                .drop("a_record_checked_error") \
                                                .drop("mx_record_checked_error") \
                                                .select(domains_checked_df.top_level_domain, \
                                                        explode(domains_checked_df.mx_record_checked) \
                                                        .alias('mx_record_checked')) \
                                                .withColumn("mx_record_ip", udf_get_a_records_for_mx("mx_record_checked")) \
                                                .withColumn('mx_record_ip', explode(col('mx_record_ip'))) \
                                                .withColumn("location", udf_get_geolite2_location("mx_record_ip")) \
                                                .withColumn("asn", udf_get_geolite2_asn("mx_record_ip")) \
                                                .select("top_level_domain", "mx_record_checked", "mx_record_ip", "location.*", "asn.*")

In [None]:
upsert_no_conflicts("domain_mx_record_geolite2", ["top_level_domain", "mx_record_checked", "mx_record_ip", "iso_code", "city", "postal", "latitude", "longitude", "autonomous_system_organization"], domains_mx_record_geolite2_df)

In [None]:
del result_map_a_record_for_mx
del domains_mx_record_geolite2_df

## D

### D - 1

In [None]:
result_map_aaaa = execute_threaded_fn(get_ipv6_record, args)
def get_aaaa_record(top_level_domain): return result_map_aaaa[top_level_domain][1]
def get_aaaa_record_err(top_level_domain): return result_map_aaaa[top_level_domain][0]

udf_ipv6_record = udf(get_aaaa_record, BooleanType())
udf_ipv6_record_err = udf(get_aaaa_record_err, IntegerType())

In [None]:
domains_ipv6_df = domains_df.withColumn("ipv6_available", udf_ipv6_record('top_level_domain'))\
                        .withColumn("ipv6_error", udf_ipv6_record_err("top_level_domain"))\
                        .drop('mx_record') \
                        .drop('a_record')

In [None]:
upsert("ip_v6_information", ["top_level_domain"], ["ipv6_available", "ipv6_error"], domains_ipv6_df)

In [None]:
del result_map_aaaa
del domains_ipv6_df

### D - 2

In [None]:
def replace_empty_strings(x): return when(col(x) == "", None).otherwise(col(x))

result_map_soa_information = execute_threaded_fn(get_soa_information, args)
result_map_nameservers = execute_threaded_fn(get_nameservers, args)

def _get_soa_information(top_level_domain): return result_map_soa_information[top_level_domain][1]
def _get_soa_information_err(top_level_domain): return result_map_soa_information[top_level_domain][0]
def _get_nameservers(top_level_domain): return result_map_nameservers[top_level_domain][1]
def _get_nameservers_err(top_level_domain): return result_map_nameservers[top_level_domain][0]

udf_get_soa_information = udf(_get_soa_information, ArrayType(StringType()))
udf_get_soa_information_err = udf(_get_soa_information_err, IntegerType())
udf_get_nameservers = udf(_get_nameservers, ArrayType(StringType()))
udf_get_nameservers_err = udf(_get_nameservers_err, IntegerType())

In [None]:
domains_soa_df = domains_df.withColumn("soa_information", udf_get_soa_information("top_level_domain"))\
                        .withColumn("soa_information_error", udf_get_soa_information_err("top_level_domain"))\
                        .withColumn("nameservers", udf_get_nameservers("top_level_domain"))\
                        .withColumn("nameservers_error", udf_get_nameservers_err("top_level_domain"))\
                        .drop('mx_record') \
                        .drop('a_record') \
                        .withColumn("nameservers_count", count_arr_udf('nameservers')) \
                        .withColumn("soa_infos_rep", concat_ws(" ", "soa_information"))

split_col = split(domains_soa_df['soa_infos_rep'], ' ')

domains_soa_df = domains_soa_df.withColumn('soa_name', split_col.getItem(0))\
                        .withColumn('refresh', split_col.getItem(3))\
                        .withColumn('minimum', split_col.getItem(6)) \
                        .drop('soa_information') \
                        .drop('soa_infos_rep') \
                        .withColumn("soa_name", replace_empty_strings("soa_name")) \
                        .withColumn('soa_name', regexp_replace('soa_name', '.$', '')) \
                        .withColumn('nameservers', udf_remove_last_char_in_array(col('nameservers')))

domains_soa_df = domains_soa_df.withColumn("refresh", domains_soa_df["refresh"].cast(IntegerType())) \
                        .withColumn("minimum", domains_soa_df["minimum"].cast(IntegerType()))

In [None]:
upsert("soa", ["top_level_domain"], ["soa_name", "soa_information_error", "refresh", "minimum", "nameservers", "nameservers_error", "nameservers_count"], domains_soa_df)

In [None]:
del result_map_soa_information
del result_map_nameservers

## D - 3

In [None]:
soa_name_count_top_ten_df = domains_soa_df.withColumn('soa_name', (col('soa_name'))) \
        .groupBy('soa_name') \
        .count() \
        .orderBy(['count'], ascending = [False]) \
        .na.drop(subset=["soa_name"]) \
        .limit(10)

soa_name_count_top_ten_df.show() # TODO Remove logging

top10_soa_names = soa_name_count_top_ten_df.select("soa_name").toPandas().values.reshape(-1)
result_map_a_record_for_soa = execute_threaded_fn(get_a_records, top10_soa_names)
def _get_a_records_for_soa(soa): 
    try: return result_map_a_record_for_soa[soa][1]
    except: return None
udf_get_a_records_for_soa = udf(_get_a_records_for_soa, ArrayType(StringType()))

soa_name_count_top_ten_df = soa_name_count_top_ten_df.withColumn("ipv4", udf_get_a_records_for_soa("soa_name")) \
        .withColumn('ipv4', explode(col('ipv4'))) \
        .withColumn("location", udf_get_geolite2_location("ipv4")) \
        .withColumn("asn", udf_get_geolite2_asn("ipv4")) \
        .select("soa_name", "count", "ipv4", "location.*", "asn.*")

soa_name_count_top_ten_df.show() # TODO Remove logging

In [None]:
upsert_no_conflicts("soa_top_ten", ["soa_name", "count", "ipv4", "iso_code", "city", "postal", "latitude", "longitude", "autonomous_system_organization"], soa_name_count_top_ten_df)

In [None]:
del domains_soa_df
del soa_name_count_top_ten_df
del result_map_a_record_for_soa

# Open quetions / TODOs

- Make table `domain_mx_record_geolite2` have key mx_record_checked and reference it from domain_checked for statistics? => Reduces a lot of space and is in accordance with upsert logic
- Rethink top ten SOA Name count - Problem: multiple IP V4 for one soa name
- top 10 tables
    - Remove and use improved queries instead => in accordance with upsert logic
    - OR overwrite