In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import lit
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.utils import resample
from pyspark.sql import functions as F

# Create Spark session
spark = SparkSession.builder.appName("Detecting-Malicious-URL App").getOrCreate()

# Load the dataset
df = pd.read_csv('/home/matan/Documents/url classification/url_dataset_updated.csv')

# Drop duplicates
df = df.drop_duplicates().reset_index(drop=True)

url_column_name = 'URL'

# Function to concatenate "https://" to URLs labeled with 0
def add_https(url, label):
    if label == 0:
        return "https://" + url
    else:
        return url

# Apply the function to the URL column
df[url_column_name] = df.apply(lambda row: add_https(row[url_column_name], row['label']), axis=1)

# Separate the dataset into malicious and benign
malicious_df = df[df['label'] == 1]
benign_df = df[df['label'] == 0]

# Randomly sample 150,000 entries from each
malicious_sampled_df = resample(malicious_df, n_samples=150000, random_state=42)
benign_sampled_df = resample(benign_df, n_samples=150000, random_state=42)

# Combine the sampled data
balanced_df = pd.concat([malicious_sampled_df, benign_sampled_df])

# Shuffle the combined dataset to mix malicious and benign URLs
balanced_df = balanced_df.sample(frac=1, random_state=42).reset_index(drop=True)

spark_df = spark.createDataFrame(balanced_df)

# Tokenize the URL column
regexTokenizer = RegexTokenizer(inputCol="URL", outputCol="Words", pattern="\\W")

# CountVectorizer converts the words into feature vectors
countVectors = CountVectorizer(inputCol=regexTokenizer.getOutputCol(), outputCol="rawfeatures", vocabSize=10000, minDF=5)

# TF-IDF
idf = IDF(inputCol=countVectors.getOutputCol(), outputCol="features")

# Create the pipeline
pipeline = Pipeline(stages=[regexTokenizer, countVectors, idf])

# Fit the pipeline to the data
pipelineFit = pipeline.fit(spark_df)
dataset = pipelineFit.transform(spark_df)

# Randomly split the dataset into training and testing (80%, 20%)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed=100)

# Convert the Spark DataFrames to Pandas for scikit-learn
train_data_pd = trainingData.select("features", "label").toPandas()
test_data_pd = testData.select("features", "label").toPandas()

# Extract features and labels
X_train = np.vstack(train_data_pd["features"].apply(lambda x: x.toArray()).to_numpy())
y_train = train_data_pd["label"].to_numpy()

X_test = np.vstack(test_data_pd["features"].apply(lambda x: x.toArray()).to_numpy())
y_test = test_data_pd["label"].to_numpy()

# Train k-NN classifier
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(X_train, y_train)

# Make predictions
predictions = knn.predict(X_test)

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, predictions)
conf_matrix = confusion_matrix(y_test, predictions)
classification_rep = classification_report(y_test, predictions)

print("Accuracy: {:.2f}%".format(accuracy * 100))
print("Confusion Matrix:\n", conf_matrix)
print("Classification Report:\n", classification_rep)
