In [None]:
import warnings
warnings.filterwarnings('ignore')

# Import necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.tree import DecisionTreeClassifier
import joblib
import tldextract

# Function to load the file based on user input
def load_file(filename):
if filename.endswith('.csv'):
df = pd.read_csv(filename)
elif filename.endswith('.xlsx'):
df = pd.read_excel(filename)
else:
raise ValueError("Unsupported file format. Please upload a CSV or Excel file.")

return df

# Function to extract features from the domain column
def extract_features(df, domain_column='domain'):
# Ensure all values in the domain column are strings
df[domain_column] = df[domain_column].astype(str).fillna('')

# Extract components of the domain
extracted = df[domain_column].apply(tldextract.extract)
df['subdomain'] = extracted.apply(lambda x: x.subdomain)
df['domain'] = extracted.apply(lambda x: x.domain)
df['suffix'] = extracted.apply(lambda x: x.suffix)

# Fill missing values with empty strings
df['subdomain'] = df['subdomain'].fillna('')
df['domain'] = df['domain'].fillna('')
df['suffix'] = df['suffix'].fillna('')

# Create new features
df['domain_length'] = df['domain'].apply(len)
df['domain_num_digits'] = df['domain'].apply(lambda x: sum(c.isdigit() for c in x))
df['domain_num_special_chars'] = df['domain'].apply(lambda x: sum(not c.isalnum() for c
in x))
df['domain_num_subdomains'] = df['subdomain'].apply(lambda x: len(x.split('.')))
df['domain_entropy'] = df['domain'].apply(lambda x: -sum(p * np.log2(p) for p in
pd.Series(list(x)).value_counts(normalize=True)))

df['host'] = df['subdomain'] + '.' + df['domain'] + '.' + df['suffix']
df['host_length'] = df['host'].apply(len)
df['host_num_digits'] = df['host'].apply(lambda x: sum(c.isdigit() for c in x))
df['host_num_special_chars'] = df['host'].apply(lambda x: sum(not c.isalnum() for c in x))
df['host_num_subdomains'] = df['subdomain'].apply(lambda x: len(x.split('.')))
df['host_entropy'] = df['host'].apply(lambda x: -sum(p * np.log2(p) for p in
pd.Series(list(x)).value_counts(normalize=True)))
return df

# Get the filename from the user for training
filename = input("Enter the name of the training file (with extension) to load: ").strip()
try:
df = load_file(filename)
print("File loaded successfully!")
display(df.head()) # Display the first few rows of the DataFrame
except Exception as e:
print(e)

# Check if the DataFrame is loaded
if 'df' in locals():
# Extract features
df = extract_features(df)

# Define features and target variable
X = df.drop(columns=['isDGA', 'subdomain', 'domain', 'suffix', 'host'])
y = df['isDGA'].map({'dga': 1, 'legit': 0})

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=33)

# Define the preprocessing steps
numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()

numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median') ])

preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features) ])

# Define the model
model = DecisionTreeClassifier(random_state=33)

# Create and train the pipeline
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', model) ])

pipeline.fit(X_train, y_train)

# Evaluate the model
y_pred = pipeline.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
print(f"Model accuracy: {accuracy}")

# Print the classification report
print(classification_report(y_test, y_pred, target_names=['non-dga', 'dga']))

# Print the number of detected DGA and non-DGA URLs
dga_count = sum(y_pred == 1)
non_dga_count = sum(y_pred == 0)
print(f"Number of detected DGA URLs: {dga_count}")
print(f"Number of detected non-DGA URLs: {non_dga_count}")

# Save the trained model pipeline
joblib.dump(pipeline, 'dga_detection_model.pkl')
print("Trained model saved as 'dga_detection_model.pkl'")

else:
print("DataFrame is not loaded. Please upload the file.")

# Function to test the model on a new dataset
def test_new_dataset(model_filename, test_filename):
try:
# Load the trained model
pipeline = joblib.load(model_filename)
print("Model loaded successfully!")

# Load the test dataset
test_df = load_file(test_filename)
print("Test file loaded successfully!")
display(test_df.head())

# Extract features from the test dataset
test_df = extract_features(test_df)
X_new = test_df.drop(columns=['domain', 'subdomain', 'suffix', 'host']) # Only drop the
domain components

# Make predictions
y_pred_new = pipeline.predict(X_new)

# Print the number of detected DGA and non-DGA URLs
dga_count_new = sum(y_pred_new == 1)
non_dga_count_new = sum(y_pred_new == 0)
print(f"Number of detected DGA URLs: {dga_count_new}")
print(f"Number of detected non-DGA URLs: {non_dga_count_new}")

# Display predictions
predictions = pd.DataFrame({
'domain': test_df['domain'] + '.' + test_df['suffix'],
'prediction': y_pred_new
}).replace({1: 'dga', 0: 'non-dga'})
display(predictions)

# Save the DGA and non-DGA URLs into separate files
dga_urls = predictions[predictions['prediction'] == 'dga']
non_dga_urls = predictions[predictions['prediction'] == 'non-dga']

dga_urls.to_csv('detected_dga_urls.csv', index=False)
non_dga_urls.to_csv('detected_non_dga_urls.csv', index=False)
print("Detected DGA URLs saved as 'detected_dga_urls.csv'")
print("Detected non-DGA URLs saved as 'detected_non_dga_urls.csv'")

except Exception as e:
print(e)

# Get the test filename from the user
test_filename = input("Enter the name of the test file (with extension) to load: ").strip()
test_new_dataset('dga_detection_model.pkl', test_filename)