In [1]:
import math
import tldextract
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from IPython.display import display
import ipywidgets as widgets
import time
import joblib

# Function to calculate the entropy of a domain name
def calculate_entropy(domain_part):
counter = Counter(domain_part)
probabilities = [n_x / len(domain_part) for n_x in counter.values()]
entropy = -sum(p_x * math.log(p_x, 2) for p_x in probabilities if p_x > 0) # Ensure no
log(0)
return entropy

# Function to compute the vowel to consonant ratio in a domain
def vowel_to_consonant_ratio(domain_part):

vowels = 'aeiou'
vowel_count = sum(1 for char in domain_part if char in vowels)
consonant_count = sum(1 for char in domain_part if char.isalpha() and char not in vowels)
return vowel_count / consonant_count if consonant_count != 0 else 0

# Extract features from domain
def extract_features(domain):
extracted = tldextract.extract(domain)
domain_part = extracted.domain
length = len(domain_part)
digits = sum(c.isdigit() for c in domain_part)
v_c_ratio = vowel_to_consonant_ratio(domain_part)
entropy = calculate_entropy(domain_part)
return [length, digits, v_c_ratio, entropy]

# Function to load the file based on user input
def load_file(filename):
if filename.endswith('.csv'):
df = pd.read_csv(filename)
elif filename.endswith('.xlsx'):
df = pd.read_excel(filename)
else:
raise ValueError("Unsupported file format. Please upload a CSV or Excel file.")
return df

# Main function to process domains
def process_domains(domain_file):
df = load_file(domain_file)
df['domain'] = df['url'].apply(lambda x: tldextract.extract(x).domain)
df['features'] = df['domain'].apply(extract_features)
domain_features = np.array(df['features'].tolist())
df = df.drop(columns='features')
df = df.join(pd.DataFrame(domain_features, columns=['length', 'digits', 'v_c_ratio',
'entropy']))

# Save results
df.to_csv('domain_features.csv', index=False)
return df

# Function to train the model
def train_model(df):
if 'type' not in df.columns:
raise KeyError("The dataset must contain a 'type' column with values 'benign',
'phishing', 'defacement', and 'malware'.")

X = df[['length', 'digits', 'v_c_ratio', 'entropy']]
y = df['type'].apply(lambda x: 0 if x == 'benign' else (1 if x == 'phishing' else (2 if x ==
'defacement' else 3)))

# Normalize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Reduce dimensions
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

X_train, X_test, y_train, y_test = train_test_split(X_pca, y, test_size=0.2,
random_state=42)
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.2f}")
print(f"Classification Report:\n{classification_report(y_test, y_pred,
target_names=['benign', 'phishing', 'defacement', 'malware'])}")

# Save model, scaler, and PCA objects
joblib.dump(model, 'model.pkl')
joblib.dump(scaler, 'scaler.pkl')
joblib.dump(pca, 'pca.pkl')
return model, scaler, pca

# Function to classify domains using the trained model
def classify_domains(df, model, scaler, pca):
X = df[['length', 'digits', 'v_c_ratio', 'entropy']]
X_scaled = scaler.transform(X)
X_pca = pca.transform(X_scaled)
y_pred = model.predict(X_pca)

df['classification'] = y_pred
df['classification'] = df['classification'].apply(lambda x: 'benign' if x == 0 else ('phishing' if
x == 1 else ('defacement' if x == 2 else 'malware')))
return df

# Function to save results
def save_results(df):
df.to_csv('classified_domains.csv', index=False)

benign_df = df[df['classification'] == 'benign']
phishing_df = df[df['classification'] == 'phishing']
defacement_df = df[df['classification'] == 'defacement']
malware_df = df[df['classification'] == 'malware']

benign_df.to_csv('benign_domains.csv', index=False)
phishing_df.to_csv('phishing_domains.csv', index=False)
defacement_df.to_csv('defacement_domains.csv', index=False)
malware_df.to_csv('malware_domains.csv', index=False)

print(f"Number of benign domains: {len(benign_df)}")
print(f"Number of phishing domains: {len(phishing_df)}")
print(f"Number of defacement domains: {len(defacement_df)}")
print(f"Number of malware domains: {len(malware_df)}")

# Interactive input in Jupyter Notebook
input_file = widgets.Text(

value='',
placeholder='Enter training file name',
description='Training File:',
disabled=False)
display(input_file)

prediction_file = widgets.Text(
value='',
placeholder='Enter prediction file name',
description='Prediction File:',
disabled=False)
display(prediction_file)

run_button = widgets.Button(
description='Run',
disabled=False,
button_style='',
tooltip='Click to run the DGA detection')
output = widgets.Output()

def on_button_clicked(b):
with output:
output.clear_output()
training_file = input_file.value
prediction_file_name = prediction_file.value

try:
start_time = time.time()
training_df = process_domains(training_file)
model, scaler, pca = train_model(training_df)

prediction_df = process_domains(prediction_file_name)
classified_df = classify_domains(prediction_df, model, scaler, pca)
save_results(classified_df)

end_time = time.time()
execution_time = end_time - start_time

print(f"Execution time: {execution_time} seconds")
print(f"Results saved as 'domain_features.csv', 'classified_domains.csv',
'benign_domains.csv', 'phishing_domains.csv', 'defacement_domains.csv', and
'malware_domains.csv'.")
except KeyError as e:
print(e)
print("Make sure your dataset contains the correct columns.")

run_button.on_click(on_button_clicked)
display(run_button, output)

# Function to load and use saved model
def load_and_classify_domains(prediction_file_name):
# Load saved model, scaler, and PCA objects

model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')
pca = joblib.load('pca.pkl')

# Process the prediction file
prediction_df = process_domains(prediction_file_name)
classified_df = classify_domains(prediction_df, model, scaler, pca)
save_results(classified_df)

# Button for classifying with a secondary file using saved model
classify_button = widgets.Button(
description='Classify with Saved Model',
disabled=False,
button_style='',
tooltip='Click to classify with saved model')

def on_classify_button_clicked(b):
with output:
output.clear_output()
prediction_file_name = prediction_file.value

try:
start_time = time.time()
load_and_classify_domains(prediction_file_name)
end_time = time.time()

execution_time = end_time - start_time

print(f"Execution time: {execution_time} seconds")
print(f"Results saved as 'classified_domains.csv', 'benign_domains.csv',
'phishing_domains.csv', 'defacement_domains.csv', and 'malware_domains.csv'.")
except Exception as e:
print(e)

classify_button.on_click(on_classify_button_clicked)
display(classify_button, output)

SyntaxError: unterminated string literal (detected at line 69) (4232545465.py, line 69)