# Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
import json

In [None]:

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("FATAL")  # Or "FATAL" to suppress even more


In [None]:
df = spark.read.option("multiline", "true").json("../parsed_output/all_dns.json")
    

In [None]:
df.show(2)

In [None]:
dns_data = df.select("timestamp","src_ip", "dst_ip","id", "opcode","qr","rcode","questions","answers")

In [None]:
all_queries = dns_data.filter((dns_data["opcode"] == 0) & (dns_data["qr"] == 0))
all_responses = dns_data.filter((dns_data["opcode"] == 0) & (dns_data["qr"] == 1))
valid_queries = all_queries.filter(sf.size("questions") > 0).drop("answers")
valid_responses = all_responses.filter(sf.size("answers") > 0).drop("questions")
empty_questions = all_queries.filter(sf.size("questions") == 0)
empty_answers = all_responses.filter(sf.size("answers") == 0)

In [None]:
summary_data =[{"Total DNS Records": dns_data.count(),
                "All Queries": all_queries.count(),
                "All Responses": all_responses.count(),
                "Valid (non-empty) Questions": valid_queries.count(),
                "Valid (non-empty) Answers": valid_responses.count()
                }]
summary = spark.createDataFrame(summary_data)
summary.show()


In [None]:
empty_questions.show(3)

In [None]:
empty_answers.show(3)

We only need queries with actual questions

In [None]:
valid_queries.show(3)


We now explode the questions arrays to extract the individual fields from the DNS request

In [None]:
valid_queries = valid_queries.withColumn("questions", sf.explode("questions"))
valid_queries = valid_queries.withColumns({
    "qname": valid_queries.questions.qname,
    "qtype": valid_queries.questions.qtype,
    "qlen": sf.length(valid_queries.questions.qname)}).drop("questions")

In [None]:
valid_queries.show(3)

Similary, we explode the answers array to extract the individual answer fields

In [None]:
valid_responses = valid_responses.withColumn("answers", sf.explode("answers")).drop("questions")
valid_responses = valid_responses.withColumns({
    "rclass":valid_responses.answers.rclass,
    "rdata": valid_responses.answers.rdata,
    "rrname": valid_responses.answers.rrname,
    "rtype": valid_responses.answers.rtype,
    "ttl": valid_responses.answers.ttl
}).drop("answers", "opcode")


In [None]:
valid_responses.show(3)

We now rename the fields in the ```valid_responses``` dataframe to avoid conflicts with the ```valid_queries``` dataframe when we perform a join later.

In [None]:
valid_responses = valid_responses.withColumnsRenamed({
    "timestamp": "ts",
    "src_ip": "ns",
    "dst_ip": "client_ip",
    "id": "rid",
    "qr": "rqr",
    "rcode": "rrcode"
})

In [None]:
valid_responses.show(3)

Now that we have prepared our dataset, we will persist to storage as ```.parquet``` format, for analysis.

In [None]:
valid_queries.write.parquet("../datasets/valid_queries.parquet", mode="overwrite")
valid_responses.write.parquet("../datasets/valid_responses.parquet", mode="overwrite")