# Import libraries

In [None]:
import pandas as pd
import numpy as np

import plotly.express as px
from plotly.subplots import make_subplots

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, col, count, when, lit
from pyspark.sql import functions as F

import plotly.express as px
import plotly.graph_objects as go

import requests
# import ast


In [None]:
spark = SparkSession.builder.appName('Malware').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# **Loading datasets**

## Malware DNS dataset

In [None]:
df_1 = spark.read.json('2018.log')
df_2 = spark.read.json('2019.log')
df_3 = spark.read.json('2020.log')
df_4 = spark.read.json('2021.log')
df_5 = spark.read.json('2022.log')
df_6 = spark.read.json('2023.log')
df_7 = spark.read.json('2024.log')

In [None]:
df_1 = df_1['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_2 = df_2['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_3 = df_3['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_4 = df_4['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_5 = df_5['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_6 = df_6['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_7 = df_7['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']
df_1.show()

In [None]:
df_1 = df_1.withColumn("year", lit(2018))
df_2 = df_2.withColumn("year", lit(2019))
df_3 = df_3.withColumn("year", lit(2020))
df_4 = df_4.withColumn("year", lit(2021))
df_5 = df_5.withColumn("year", lit(2022))
df_6 = df_6.withColumn("year", lit(2023))
df_7 = df_7.withColumn("year", lit(2024))

df_1.show()

In [None]:
df_m = df_1.union(df_2).union(df_3).union(df_4).union(df_5).union(df_6).union(df_7)
df_m.show()

In [None]:
# add id column
df_m = df_m.withColumn("id", monotonically_increasing_id())
df_m = df_m.withColumn("id", df_m["id"] + 1)

# add malware columns
df_m = df_m.withColumn("malware", lit(1))

df_m.show()

In [None]:
df_m = df_m['year', 'id', 'icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy', 'malware']
df_m.show()

In [None]:
print(f" 2018 count is: {df_1.count()} \n 2019 count is: {df_2.count()} \n 2020 count is: {df_3.count()} \n 2021 count is: {df_4.count()} \n 2022 count is: {df_5.count()} \n 2023 count is: {df_6.count()} \n 2024 count is: {df_7.count()}")

In [None]:
print(f"malware df count is {df_m.count()}")

In [None]:
tld_count_m = df_m.groupBy('icann_tld').count().orderBy(F.col('count').desc()).limit(10)
fig1 = px.pie(tld_count_m, names="icann_tld", values='count', title="Distribution of icann_tld (malware=0)")
fig1.show()

## Normal DNS dataset

In [None]:
df_n = spark.read.json('top-1m-dns.log')

df_n = df_n[df_n["`id.resp_h`"] == "8.8.8.8"]
df_n = df_n['icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy']

df_n.show()

In [None]:
df_n = df_n.withColumn("year", lit(0))
df_n = df_n.withColumn("id", lit(0))
df_n = df_n.withColumn("malware", lit(0))

df_n = df_n['year', 'id', 'icann_tld', 'icann_domain', 'query', 'length', 'qtype_name', 'rcode_name', 'Z', 'rtt', 'answers', 'TTLs', 'entropy', 'malware']

df_n.show()

# **data cleaning**

In [None]:
df_n.printSchema()

In [None]:
df_m.printSchema()

In [None]:
df_m = df_m.join(df_n, df_m['icann_domain'] == df_n['icann_domain'], how='left_anti')
df_m = df_m.join(df_n, df_m['query'] == df_n['query'], how='left_anti')

In [None]:
df_final = df_n.union(df_m)
df_final.count()

In [None]:
df_final = df_n.union(df_m)

In [None]:
df_final.count()

In [None]:
# df_final = df_final.toPandas()
# df_final

In [None]:
df_final.select(count(when(col("icann_domain").isNull(), 1)).alias("null_count_in_name")).show()

In [None]:
df_final.select(count(when(col("query").isNull(), 1)).alias("null_count_in_name")).show()

In [None]:
# remove unknown tld
df_final = df_final.dropna(subset="icann_domain")
df_final.count()

In [None]:
df_final = df_final.filter(df_final['icann_tld'] != "")
df_final.count()

In [None]:
df_m = df_final.filter(df_final['malware'] == 1)
# remove unknown tld
df_m = df_m.filter(df_m['icann_tld'] != "")

df_n = df_final.filter(df_final['malware'] != 1)

print(f" malware count: {df_m.count()} \n normal count: {df_n.count()}")

In [None]:
df_final.groupBy(F.col('rcode_name')).count().show()

In [None]:
df_final.sample(fraction=0.2, seed=42).show()

# **Analysis**

## *TTL*

### preprocessing

In [None]:
df_ttl = df_final['icann_tld', 'icann_domain', 'TTLs', 'malware']
df_ttl.show()

In [None]:
print(f"count of TTL bedfor drop nulls: {df_ttl.count()}")

In [None]:
print(f"number of null values in normals:   {df_ttl.filter(df_ttl['TTLs'].isNull() & (df_ttl['malware'] == 0)).count()}")
print(f"number of null values in malwares:  {df_ttl.filter(df_ttl['TTLs'].isNull() & (df_ttl['malware'] == 1)).count()}")

In [None]:
# drop nulls
df_ttl = df_ttl.dropna()

In [None]:
print(f"count of TTL after drop nulls: {df_ttl.count()}")

### unique

In [None]:
exploded_df = df_ttl.withColumn("TTL", F.explode("TTLs"))

df_ttl = exploded_df.groupBy('icann_tld', 'icann_domain', 'malware') \
                       .agg(F.sort_array(F.collect_list('TTL')).alias('sorted_TTLs'))

df_ttl.show()

In [None]:
df_ttl.groupBy('icann_domain').count().orderBy(F.col('count').desc()).show()

In [None]:
print(f"number of normal values:  {df_ttl.filter(df_ttl['malware'] != 1).count()}")
print(f"number of malware values: {df_ttl.filter(df_ttl['malware'] == 1).count()}")

### new ttl dataframe 

In [None]:
df_ttl_result = df_ttl.withColumn(
    "min", F.expr("array_min(sorted_TTLs)")  # Get the minimum TTL
).withColumn(
    "max", F.expr("array_max(sorted_TTLs)")  # Get the maximum TTL
).withColumn(
    "mean", F.expr("aggregate(sorted_TTLs, 0D, (acc, x) -> acc + x) / size(sorted_TTLs)")  # Get the mean of TTLs
)

# Select the desired columns
df_ttl_result = df_ttl_result.select('icann_tld', 'icann_domain', 'sorted_TTLs', 'min', 'max', 'mean', 'malware')

In [None]:
# show random rows
df_ttl_result.sample(fraction=0.2, seed=42).show()

### TTL eda

In [None]:
df_ttl_pd = df_ttl_result.toPandas()

# Add a "Group" column to distinguish malware categories
df_ttl_pd['Group'] = df_ttl_pd['malware'].apply(lambda x: 'Malware' if x == 1 else 'Normal')

# Create a boxplot with Plotly Express
fig = px.box(
    df_ttl_pd,
    x='Group',
    y='mean',
    title='Distribution of Mean Values by Malware Status',
    labels={'mean': 'Mean Value', 'Group': 'Malware Status'}
)

# Show the plot
fig.show()

In [None]:
df_ttl_filtered = df_ttl_result.filter(df_ttl_result['mean'] < 30000)

# Convert the filtered PySpark DataFrame to Pandas
df_ttl_filtered_pd = df_ttl_filtered.toPandas()

# Add a "Group" column to distinguish malware categories
df_ttl_filtered_pd['Group'] = df_ttl_filtered_pd['malware'].apply(lambda x: 'Malware' if x == 1 else 'Normal')

# Create a boxplot with Plotly Express
fig = px.box(
    df_ttl_filtered_pd,
    x='Group',
    y='mean',
    title='Distribution of Mean Values by Malware Status (Under 30k)',
    labels={'mean': 'Mean Value', 'Group': 'Malware Status'}
)

# Show the plot
fig.show()

In [None]:
df_top_9 = df_ttl.groupBy("icann_tld").count().orderBy(F.col("count").desc()).limit(9)

# Join back with cleaned data to filter by malware
df_limited = df_ttl.join(df_top_9.select("icann_tld"), on="icann_tld", how="inner")

# Count occurrences for malware == 0 within top 9
df_malware_0 = df_limited.filter(df_limited['malware'] == 0).groupBy("icann_tld").count().toPandas()

# Count occurrences for malware == 1 within top 9
df_malware_1 = df_limited.filter(df_limited['malware'] == 1).groupBy("icann_tld").count().toPandas()

# Create a subplot figure
fig = make_subplots(rows=1, cols=2, subplot_titles=("Normal", "Malware"), specs=[[{"type": "pie"}, {"type": "pie"}]])

# Add pie chart for malware == 0
fig.add_trace(go.Pie(labels=df_malware_0["icann_tld"], values=df_malware_0["count"], name="Normal"), row=1, col=1)

# Add pie chart for malware == 1
fig.add_trace(go.Pie(labels=df_malware_1["icann_tld"], values=df_malware_1["count"], name="Malware"), row=1, col=2)

# Update layout
fig.update_layout(title_text="Top 9 icann_tld Distribution by Malware Status", showlegend=True)

# Show plot
fig.show()

## *RTT*

### preprocessing

In [None]:
df_rtt = df_final['icann_tld', 'icann_domain', 'rtt', 'malware']
df_rtt.show()

In [None]:
print(f"count of TTL after drop nulls: {df_rtt.count()}")

In [None]:
print(f"number of null values in normals:   {df_rtt.filter(df_rtt['rtt'].isNull() & (df_rtt['malware'] == 0)).count()}")
print(f"number of null values in malwares:  {df_rtt.filter(df_rtt['rtt'].isNull() & (df_rtt['malware'] == 1)).count()}")

In [None]:
# drop nulls
df_rtt = df_rtt.dropna()

In [None]:
print(f"count of TTL after drop nulls: {df_rtt.count()}")

### unique

In [None]:
df_rtt = df_rtt.groupBy('icann_domain', 'icann_tld', 'malware') \
                      .agg(F.sort_array(F.collect_list('rtt')).alias('sorted_rtt'))

# Show the result
df_rtt.show()

In [None]:
df_rtt.groupBy('icann_domain').count().orderBy(F.col('count').desc()).show()

In [None]:
print(f"number of normal values:  {df_rtt.filter(df_rtt['malware'] != 1).count()}")
print(f"number of malware values: {df_rtt.filter(df_rtt['malware'] == 1).count()}")

### new rtt dataframe 

In [None]:
df_rtt_result = df_rtt.withColumn(
    "min", F.expr("array_min(sorted_rtt)")  # Get the minimum TTL
).withColumn(
    "max", F.expr("array_max(sorted_rtt)")  # Get the maximum TTL
).withColumn(
    "mean", F.expr("aggregate(sorted_rtt, 0D, (acc, x) -> acc + x) / size(sorted_rtt)")  # Get the mean of TTLs
)

# Select the desired columns
df_rtt_result = df_rtt_result.select('icann_tld', 'icann_domain', 'sorted_rtt', 'min', 'max', 'mean', 'malware')

In [None]:
# show random rows
df_rtt_result.sample(fraction=0.2, seed=42).show()

### RTT eda

In [None]:
df_rtt_pd = df_rtt_result.toPandas()

# Add a "Group" column to distinguish malware categories
df_rtt_pd['Group'] = df_rtt_pd['malware'].apply(lambda x: 'Malware' if x == 1 else 'Normal')

# Create a boxplot with Plotly Express
fig = px.box(
    df_rtt_pd,
    x='Group',
    y='mean',
    title='RTT values',
    labels={'mean': 'Mean Value', 'Group': 'Malware Status'}
)

# Show the plot
fig.show()

In [None]:
df_rtt_filtered = df_rtt_result.filter(df_rtt_result['mean'] < 0.75)

# Convert the filtered PySpark DataFrame to Pandas
df_rtt_filterd_pd = df_rtt_filtered.toPandas()

# Add a "Group" column to distinguish malware categories
df_rtt_filterd_pd['Group'] = df_rtt_filterd_pd['malware'].apply(lambda x: 'Malware' if x == 1 else 'Normal')

# Create a boxplot with Plotly Express
fig = px.box(
    df_rtt_filterd_pd,
    x='Group',
    y='mean',
    title='RTT filtered values (Under 0.75)',
    labels={'mean': 'Mean Value', 'Group': 'Malware Status'}
)

# Show the plot
fig.show()

In [None]:
df_top9rtt_tld = df_rtt.groupBy("icann_tld").count().orderBy(F.col("count").desc()).limit(9)

# Join back with cleaned data to filter by malware
df_limited = df_rtt.join(df_top9rtt_tld.select("icann_tld"), on="icann_tld", how="inner")

# Count occurrences for malware == 0 within top 9
df_malware_0 = df_limited.filter(df_limited['malware'] == 0).groupBy("icann_tld").count().toPandas()

# Count occurrences for malware == 1 within top 9
df_malware_1 = df_limited.filter(df_limited['malware'] == 1).groupBy("icann_tld").count().toPandas()

# Create a subplot figure
fig = make_subplots(rows=1, cols=2, subplot_titles=("Normal", "Malware"), specs=[[{"type": "pie"}, {"type": "pie"}]])

# Add pie chart for malware == 0
fig.add_trace(go.Pie(labels=df_malware_0["icann_tld"], values=df_malware_0["count"], name="Normal"), row=1, col=1)

# Add pie chart for malware == 1
fig.add_trace(go.Pie(labels=df_malware_1["icann_tld"], values=df_malware_1["count"], name="Malware"), row=1, col=2)

# Update layout
fig.update_layout(title_text="Top 9 icann_tld Distribution by Malware Status", showlegend=True)

# Show plot
fig.show()

## *Entropy*

### preprocessing

In [None]:
df_ent = df_final['icann_tld', 'icann_domain', 'entropy', 'malware']
df_ent.show()

In [None]:
print(f"count of TTL after drop nulls: {df_ent.count()}")

In [None]:
print(f"number of null values in normals:   {df_ent.filter(df_ent['entropy'].isNull() & (df_ent['malware'] == 0)).count()}")
print(f"number of null values in malwares:  {df_ent.filter(df_ent['entropy'].isNull() & (df_ent['malware'] == 1)).count()}")

In [None]:
# drop nulls
df_ent = df_ent.dropna()

In [None]:
print(f"count of TTL after drop nulls: {df_ent.count()}")

### unique

In [None]:
df_ent = df_ent.groupBy('icann_tld', 'icann_domain', 'malware') \
                      .agg(F.sort_array(F.collect_list('entropy')).alias('sorted_entropy'))

# Show the result
df_ent.show()

In [None]:
df_ent.groupBy('icann_domain').count().orderBy(F.col('count').desc()).show()

In [None]:
print(f"number of normal values:  {df_ent.filter(df_ent['malware'] != 1).count()}")
print(f"number of malware values: {df_ent.filter(df_ent['malware'] == 1).count()}")

### new ent dataframe 

In [None]:
df_ent_result = df_ent.withColumn(
    "min", F.expr("array_min(sorted_entropy)")  # Get the minimum TTL
).withColumn(
    "max", F.expr("array_max(sorted_entropy)")  # Get the maximum TTL
).withColumn(
    "mean", F.expr("aggregate(sorted_entropy, 0D, (acc, x) -> acc + x) / size(sorted_entropy)")  # Get the mean of TTLs
)

# Select the desired columns
df_ent_result = df_ent_result.select('icann_tld', 'icann_domain', 'sorted_entropy', 'min', 'max', 'mean', 'malware')

In [None]:
# show random rows
df_ent_result.sample(fraction=0.2, seed=42).show()

### Ent eda

In [None]:
df_ent_pd = df_ent_result.toPandas()

# Add a "Group" column to distinguish malware categories
df_ent_pd['Group'] = df_ent_pd['malware'].apply(lambda x: 'Malware' if x == 1 else 'Normal')

# Create a boxplot with Plotly Express
fig = px.box(
    df_ent_pd,
    x='Group',
    y='mean',
    title='Distribution of Mean Values by Malware Status',
    labels={'mean': 'Mean Value', 'Group': 'Malware Status'}
)

# Show the plot
fig.show()

## *TLD eda*

In [None]:
df_tld = df_final.groupBy("icann_tld", 'malware').count().orderBy(col("count").desc()).show()

### **whole year**

In [None]:
# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_m.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2018 to 2024", showlegend=True)

# Show plot
fig.show()

In [None]:
df_m.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2018**

In [None]:
df_2018 = df_m.filter(F.col("year") == 2018)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2018.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2018", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2018.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2019**

In [None]:
df_2019 = df_m.filter(F.col("year") == 2019)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2019.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2019", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2019.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2020**

In [None]:
df_2020 = df_m.filter(F.col("year") == 2020)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2020.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2020", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2020.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2021**

In [None]:
df_2021 = df_m.filter(F.col("year") == 2021)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2021.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2021", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2021.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2022**

In [None]:
df_2022 = df_m.filter(F.col("year") == 2022)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2022.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2022", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2022.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2023**

In [None]:
df_2023 = df_m.filter(F.col("year") == 2023)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2023.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2023", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2023.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

### **2024**

In [None]:
df_2024 = df_m.filter(F.col("year") == 2024)

# Count occurrences of each 'icann_tld' and select the top 10
df_top_10 = df_2024.groupBy("icann_tld").count().orderBy(col("count").desc()).limit(10)

# Convert to Pandas for plotting
df_top_10_pd = df_top_10.toPandas()`

# Create a pie chart
fig = go.Figure(data=[go.Pie(labels=df_top_10_pd["icann_tld"], values=df_top_10_pd["count"], name="Top 10 icann_tld")])

# Update layout
fig.update_layout(title_text="Top 10 icann_tld Distribution for Year 2024", showlegend=True)

# Show plot
fig.show()

In [None]:
df_2024.groupBy("icann_tld").count().orderBy(col("count").desc()).show()

## *rcode eda*

In [None]:
df_RN_count = df_final.groupBy('rcode_name', 'malware', 'year').count().orderBy(F.col('count').desc())
fig = px.histogram(df_RN_count, x="rcode_name", y='count',
             color='malware', barmode='group',
             height=400)
fig.show()

In [None]:
df_RN_count.show()