In [1]:
import pandas as pd

# Load the uploaded datasets
file_path_1 = './Dataset I.csv'
file_path_2 = './Dataset II.csv'

# Read the datasets
dataset_1 = pd.read_csv(file_path_1)
dataset_2 = pd.read_csv(file_path_2)

# Display the first few rows of each dataset to understand their structure
dataset_1_head = dataset_1.head()
dataset_2_head = dataset_2.head()

dataset_1_head, dataset_2_head


  dataset_1 = pd.read_csv(file_path_1)


(                    Websites   Lable
 0                 google.com  Benign
 1             www.google.com  Benign
 2              microsoft.com  Benign
 3         data.microsoft.com  Benign
 4  events.data.microsoft.com  Benign,
                                                  URL    type
 0      https://www.unb.ca/cic/datasets/url-2016.html  Benign
 1    https://www.unb.ca/cic/datasets/andmal2020.html  Benign
 2        https://www.sec.cs.tu-bs.de/~danarp/drebin/  Benign
 3  https://github.com/traceflight/Android-Malware...  Benign
 4  https://ieee-dataport.org/documents/dataset-an...  Benign)

In [2]:
def extract_lexical_features(df, url_column):
    import re
    
    # Check for uncommon characters
    uncommon_chars = re.compile(r'[^a-zA-Z0-9/:.\-_]')
    df['num_uncommon_chars'] = df[url_column].apply(lambda x: len(uncommon_chars.findall(x)))

    # Check if URL contains "@" symbol
    df['has_at_symbol'] = df[url_column].apply(lambda x: '@' in x)

    # Check if URL contains "http" or "https"
    df['uses_https'] = df[url_column].apply(lambda x: 'https' in x.lower())

    # Check for suspicious patterns like '-' in the domain
    df['has_suspicious_hyphens'] = df[url_column].apply(lambda x: '-' in x.split('/')[2])

    # Check the length of the domain
    df['domain_length'] = df[url_column].apply(lambda x: len(x.split('/')[2]) if 'http' in x else len(x.split('.')[0]))

    return df


In [3]:
!pip install whois



In [4]:
def extract_host_features(df, url_column):
    import whois
    from urllib.parse import urlparse

    # Get domain registration information
    def get_registration_date(url):
        try:
            domain = urlparse(url).netloc
            whois_info = whois.whois(domain)
            return whois_info.creation_date
        except:
            return None

    df['domain_registration_date'] = df[url_column].apply(get_registration_date)

    # Check if the domain is hosted on a known server (e.g., AWS)
    known_hosts = ['aws', 'google', 'microsoft', 'cloudflare']
    df['hosted_on_known_server'] = df[url_column].apply(lambda x: any(host in x.lower() for host in known_hosts))

    # Use a popularity score from OpenPageRank (requires API)
    # You can include OpenPageRank API integration here if needed.

    return df


In [5]:
def extract_content_features(df, url_column):
    import requests
    from bs4 import BeautifulSoup
    
    # Check for presence of suspicious scripts
    def has_suspicious_scripts(url):
        try:
            response = requests.get(url, timeout=5)
            soup = BeautifulSoup(response.content, 'html.parser')
            scripts = soup.find_all('script')
            suspicious_keywords = ['eval', 'document.write', 'setTimeout', 'setInterval']
            for script in scripts:
                if any(keyword in script.string for keyword in suspicious_keywords if script.string):
                    return True
            return False
        except:
            return False

    df['has_suspicious_scripts'] = df[url_column].apply(has_suspicious_scripts)

    # Check for invalid security certificates
    def has_valid_certificate(url):
        try:
            response = requests.get(url, verify=True, timeout=5)
            return response.ok
        except:
            return False

    df['has_valid_certificate'] = df[url_column].apply(has_valid_certificate)

    return df


In [6]:
def extract_features(df, url_column):
    df['url_length'] = df[url_column].apply(len)
    df['num_special_chars'] = df[url_column].apply(lambda x: sum(1 for char in x if char in ['/', '?', '&', '=', '-', '_', '%', '.']))
    df['has_ip'] = df[url_column].apply(lambda x: any(part.isdigit() for part in x.split('.')))
    df['num_subdomains'] = df[url_column].apply(lambda x: len(x.split('.')) - 2 if 'http' not in x else len(x.split('/')[2].split('.')) - 2)
    df['top_level_domain'] = df[url_column].apply(lambda x: x.split('.')[-1].split('/')[0])
    return df



In [7]:
def extract_all_features(df, url_column):
    df = extract_features(df, url_column)  # Original function you provided
    df = extract_lexical_features(df, url_column)
    # df = extract_host_features(df, url_column)
    # df = extract_content_features(df, url_column)
    return df

In [8]:
dataset_1_cleaned = dataset_1.dropna(subset=['Websites'])
dataset_2_cleaned = dataset_2.dropna(subset=['URL'])


In [9]:
def sanitize_and_clean_urls(df, url_column):
    # Ensure all entries are strings and non-empty
    df[url_column] = df[url_column].astype(str).str.strip()
    df = df[df[url_column].str.contains(r'\.', regex=True)]  # Keep valid URLs containing at least one '.'
    df = df[df[url_column].apply(lambda x: len(x.split('/')) > 2)]  # Ensure valid URL structure
    return df

# Re-clean datasets
dataset_1_cleaned = sanitize_and_clean_urls(dataset_1_cleaned, 'Websites')
dataset_2_cleaned = sanitize_and_clean_urls(dataset_2_cleaned, 'URL')


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[url_column] = df[url_column].astype(str).str.strip()


In [10]:
!pip install beautifulsoup4




In [11]:

dataset_1_features = extract_all_features(dataset_1_cleaned, 'Websites')
dataset_2_features = extract_all_features(dataset_2_cleaned, 'URL')

In [12]:
# Display the first few rows of each dataset directly
dataset_1_features.head()


Unnamed: 0,Websites,Lable,url_length,num_special_chars,has_ip,num_subdomains,top_level_domain,num_uncommon_chars,has_at_symbol,uses_https,has_suspicious_hyphens,domain_length
4002,http://www.marketingbyinternet.com/mo/e56508df...,Malicious,71,7,False,1,com,0,False,False,False,27
4003,http://www.824555.com/app/member/SportOption.p...,Malicious,67,12,True,1,php?uid=guest&langx=gb,4,False,False,False,14
4004,https://docs.google.com/spreadsheet/viewform?f...,Malicious,87,8,False,1,com,2,False,True,False,15
4006,http://www.martin-busker.de/administrator/help...,Malicious,153,16,False,1,php,0,False,False,True,20
4009,http://www.bimabn.com/1-configurazione-support...,Malicious,113,15,False,1,store-contatta,0,False,False,False,14


In [13]:
# Display the first few rows of each dataset directly
dataset_2_features.head()


Unnamed: 0,URL,type,url_length,num_special_chars,has_ip,num_subdomains,top_level_domain,num_uncommon_chars,has_at_symbol,uses_https,has_suspicious_hyphens,domain_length
0,https://www.unb.ca/cic/datasets/url-2016.html,Benign,45,9,False,1,html,0,False,True,False,10
1,https://www.unb.ca/cic/datasets/andmal2020.html,Benign,47,8,False,1,html,0,False,True,False,10
2,https://www.sec.cs.tu-bs.de/~danarp/drebin/,Benign,43,10,False,3,de,1,False,True,True,19
3,https://github.com/traceflight/Android-Malware...,Benign,55,7,False,0,com,0,False,True,False,10
4,https://ieee-dataport.org/documents/dataset-an...,Benign,75,9,False,0,org,1,False,True,True,17


In [14]:
dataset_2_features.shape

(4999, 12)

In [15]:
dataset_1_features.shape

(957, 12)

In [16]:
combined_dataset = pd.concat([
    dataset_1_features.rename(columns={"Websites": "URL", "Lable": "Type"}),
    dataset_2_features.rename(columns={"type": "Type"})
], ignore_index=True)


In [17]:
combined_dataset.shape

(5956, 12)

In [18]:
combined_dataset.to_csv('combined_dataset_more_features.csv', index=False)

In [19]:
combined_dataset.head()

Unnamed: 0,URL,Type,url_length,num_special_chars,has_ip,num_subdomains,top_level_domain,num_uncommon_chars,has_at_symbol,uses_https,has_suspicious_hyphens,domain_length
0,http://www.marketingbyinternet.com/mo/e56508df...,Malicious,71,7,False,1,com,0,False,False,False,27
1,http://www.824555.com/app/member/SportOption.p...,Malicious,67,12,True,1,php?uid=guest&langx=gb,4,False,False,False,14
2,https://docs.google.com/spreadsheet/viewform?f...,Malicious,87,8,False,1,com,2,False,True,False,15
3,http://www.martin-busker.de/administrator/help...,Malicious,153,16,False,1,php,0,False,False,True,20
4,http://www.bimabn.com/1-configurazione-support...,Malicious,113,15,False,1,store-contatta,0,False,False,False,14


In [20]:
combined_dataset.to_csv('combined_dataset_more_features.csv')

In [21]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

# Create separate label encoders for 'Type' and 'top_level_domain'
type_encoder = LabelEncoder()
top_level_domain_encoder = LabelEncoder()

# Encode the 'Type' column as the target variable
combined_dataset['Type_encoded'] = type_encoder.fit_transform(combined_dataset['Type'])

# Encode 'top_level_domain' as a feature
combined_dataset['top_level_domain_encoded'] = top_level_domain_encoder.fit_transform(combined_dataset['top_level_domain'])

# Select all features and add the new ones
features = [
    'url_length', 
    'num_special_chars', 
    'has_ip', 
    'num_subdomains', 
    'top_level_domain_encoded', 
    'num_uncommon_chars', 
    'has_at_symbol', 
    'uses_https', 
    'has_suspicious_hyphens', 
    'domain_length'
]

# Ensure all features are numeric
X = combined_dataset[features]
y = combined_dataset['Type_encoded']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a Random Forest model
rf_model = RandomForestClassifier(random_state=42)
rf_model.fit(X_train, y_train)

# Make predictions
y_pred = rf_model.predict(X_test)

# Evaluate the model
classification_report_output = classification_report(y_test, y_pred, target_names=type_encoder.classes_)

print(classification_report_output)


              precision    recall  f1-score   support

      Benign       0.99      0.97      0.98       481
   Malicious       0.98      0.99      0.99       711

    accuracy                           0.98      1192
   macro avg       0.98      0.98      0.98      1192
weighted avg       0.98      0.98      0.98      1192



In [22]:

import joblib

joblib.dump(rf_model, 'rfmodel_static.joblib')
print("Model saved as 'rfmodel_static.joblib'")

Model saved as 'rfmodel_static.joblib'


In [23]:
joblib.dump(type_encoder, 'type_encoder_static.joblib')
print("Type encoder saved as 'type_encoder_static.joblib'")
joblib.dump(top_level_domain_encoder, 'top_level_domain_encoder_static.joblib')
print("Top level domain encoder saved as 'top_level_domain_encoder_static.joblib'")

Type encoder saved as 'type_encoder_static.joblib'
Top level domain encoder saved as 'top_level_domain_encoder_static.joblib'


In [24]:
!pip install xgboost




In [25]:
from xgboost import XGBClassifier
xgb_model = XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss')
xgb_model.fit(X_train, y_train)

# Make predictions  
xgb_y_pred = xgb_model.predict(X_test)

# Evaluate the model
xgb_classification_report = classification_report(y_test, xgb_y_pred, target_names=type_encoder.classes_)

xgb_classification_report

Parameters: { "use_label_encoder" } are not used.



'              precision    recall  f1-score   support\n\n      Benign       0.98      0.98      0.98       481\n   Malicious       0.98      0.99      0.99       711\n\n    accuracy                           0.98      1192\n   macro avg       0.98      0.98      0.98      1192\nweighted avg       0.98      0.98      0.98      1192\n'

In [26]:
import joblib

joblib.dump(xgb_model, 'xgbmodel_static.joblib')
print("Model saved as 'xgbmodel_static.joblib'")

Model saved as 'xgbmodel_static.joblib'


In [27]:
# from sklearn.model_selection import GridSearchCV

# # Define the parameter grid for optimization
# param_grid = {
#     'n_estimators': [100, 200, 300],
#     'max_depth': [10, 20, 30, None],
#     'min_samples_split': [2, 5, 10],
#     'min_samples_leaf': [1, 2, 4]
# }

# # Perform grid search with cross-validation to find the best parameters
# grid_search = GridSearchCV(
#     estimator=RandomForestClassifier(random_state=42),
#     param_grid=param_grid,
#     scoring='precision_weighted',  # Optimize for higher precision
#     cv=3,  # 3-fold cross-validation
#     verbose=1,
#     n_jobs=-1
# )

# # Fit the grid search to the training data
# grid_search.fit(X_train, y_train)

# # Get the best model and parameters
# best_model = grid_search.best_estimator_
# best_params = grid_search.best_params_

# # Evaluate the optimized model
# optimized_y_pred = best_model.predict(X_test)
# optimized_classification_report = classification_report(y_test, optimized_y_pred, target_names=type_encoder.classes_)

# best_params, optimized_classification_report

# dynammic qr codes  

In [28]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Re-load the original combined dataset
file_path_combined = './combined_dataset_more_features.csv'
combined_dataset = pd.read_csv(file_path_combined)

# Create a copy for the new dynamic dataset
dynamic_dataset = combined_dataset.copy()

# Add 'created_at', 'expires_at', and 'transaction_at' columns to dynamic_dataset
np.random.seed(42)  # For reproducibility
current_time = datetime.now()

# Generate 'created_at' within the last 30 days
dynamic_dataset['created_at'] = [
    current_time - timedelta(days=np.random.randint(0, 30), hours=np.random.randint(0, 24))
    for _ in range(len(dynamic_dataset))
]

# Generate 'expires_at' by adding a random validity period (1 to 5 hours) to 'created_at'
dynamic_dataset['expires_at'] = dynamic_dataset['created_at'] + pd.to_timedelta(
    np.random.randint(1, 6, size=len(dynamic_dataset)), unit='h'
)

# Generate 'transaction_at' as random timestamps within or outside the valid range
dynamic_dataset['transaction_at'] = [
    created + pd.to_timedelta(np.random.randint(-3, 8), unit='h')  # Random transaction time
    for created in dynamic_dataset['created_at']
]

# Update 'Type' based on the condition
# If the transaction_at doesn't fall in the valid range and the label is benign, mark as malicious
dynamic_dataset['Type'] = dynamic_dataset.apply(
    lambda row: 'Malicious' if (
        row['Type'] == 'Benign' and not (row['created_at'] <= row['transaction_at'] <= row['expires_at'])
    ) else row['Type'], axis=1
)
dynamic_dataset

Unnamed: 0.1,Unnamed: 0,URL,Type,url_length,num_special_chars,has_ip,num_subdomains,top_level_domain,num_uncommon_chars,has_at_symbol,uses_https,has_suspicious_hyphens,domain_length,created_at,expires_at,transaction_at
0,0,http://www.marketingbyinternet.com/mo/e56508df...,Malicious,71,7,False,1,com,0,False,False,False,27,2024-12-13 18:32:13.493908,2024-12-13 21:32:13.493908,2024-12-14 00:32:13.493908
1,1,http://www.824555.com/app/member/SportOption.p...,Malicious,67,12,True,1,php?uid=guest&langx=gb,4,False,False,False,14,2024-11-21 23:32:13.493908,2024-11-22 01:32:13.493908,2024-11-22 02:32:13.493908
2,2,https://docs.google.com/spreadsheet/viewform?f...,Malicious,87,8,False,1,com,2,False,True,False,15,2024-12-10 06:32:13.493908,2024-12-10 08:32:13.493908,2024-12-10 05:32:13.493908
3,3,http://www.martin-busker.de/administrator/help...,Malicious,153,16,False,1,php,0,False,False,True,20,2024-11-21 17:32:13.493908,2024-11-21 18:32:13.493908,2024-11-21 16:32:13.493908
4,4,http://www.bimabn.com/1-configurazione-support...,Malicious,113,15,False,1,store-contatta,0,False,False,False,14,2024-12-13 19:32:13.493908,2024-12-13 20:32:13.493908,2024-12-13 18:32:13.493908
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5951,5951,http://www.driduct.com/home,Malicious,27,5,False,1,com,0,False,False,False,15,2024-12-16 23:32:13.493908,2024-12-17 01:32:13.493908,2024-12-17 05:32:13.493908
5952,5952,http://greg-miller.com/wp-content/themes/twent...,Malicious,249,19,False,0,php?cmd=login_submit&amp;id=8c4aa22d352c4d9239...,8,False,False,True,15,2024-12-12 07:32:13.493908,2024-12-12 12:32:13.493908,2024-12-12 06:32:13.493908
5953,5953,http://greg-miller.com/wp-content/themes/twent...,Malicious,249,19,False,0,php?cmd=login_submit&amp;id=77700b6afcca759deb...,8,False,False,True,15,2024-12-10 15:32:13.493908,2024-12-10 20:32:13.493908,2024-12-10 21:32:13.493908
5954,5954,http://greg-miller.com/wp-content/themes/twent...,Malicious,249,19,False,0,php?cmd=login_submit&amp;id=291e7d25cb52f8e2e2...,8,False,False,True,15,2024-12-08 06:32:13.493908,2024-12-08 11:32:13.493908,2024-12-08 05:32:13.493908


In [29]:
combined_dataset['Type'].value_counts()

Type
Malicious    3456
Benign       2500
Name: count, dtype: int64

In [30]:
dynamic_dataset['Type'].value_counts()

Type
Malicious    5074
Benign        882
Name: count, dtype: int64

In [31]:
# Identify transactions that were altered from benign to malicious
altered_to_malicious = dynamic_dataset[(dynamic_dataset['Type'] == 'Malicious') & (combined_dataset['Type'] == 'Benign')]

# Adjust the timestamps for these transactions to make them valid (i.e., ensure transaction_at falls in the valid period)
healthy_versions = altered_to_malicious.copy()
healthy_versions['transaction_at'] = healthy_versions['created_at'] + timedelta(minutes=1)  # Make transaction_at just after created_at
healthy_versions['Type'] = 'Benign'  # Reset the label to benign

# Append the healthy versions back to the dynamic dataset
dynamic_dataset_updated = pd.concat([dynamic_dataset, healthy_versions], ignore_index=True)


In [32]:
dynamic_dataset_updated['Type'].value_counts()

Type
Malicious    5074
Benign       2500
Name: count, dtype: int64

In [33]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

# Preprocessing
# Encode categorical variables
type_encoder = LabelEncoder()
top_level_domain_encoder = LabelEncoder()

# Encode the 'Type' column as the target variable
dynamic_dataset_updated['Type_encoded'] = type_encoder.fit_transform(dynamic_dataset_updated['Type'])

# Encode 'top_level_domain' as a feature
dynamic_dataset_updated['top_level_domain_encoded'] = top_level_domain_encoder.fit_transform(dynamic_dataset_updated['top_level_domain'])

# Select features and target variable
features = ['url_length', 'num_special_chars', 'has_ip', 'num_subdomains', 'top_level_domain_encoded']
# Add new timestamp-based features for the model
dynamic_dataset_updated['transaction_within_valid_period'] = (
    (dynamic_dataset_updated['created_at'] <= dynamic_dataset_updated['transaction_at']) &
    (dynamic_dataset_updated['transaction_at'] <= dynamic_dataset_updated['expires_at'])
).astype(int)

features += ['transaction_within_valid_period']
X = dynamic_dataset_updated[features]
y = dynamic_dataset_updated['Type_encoded']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a Random Forest model
rf_model = RandomForestClassifier(random_state=42)
rf_model.fit(X_train, y_train)

# Make predictions
y_pred = rf_model.predict(X_test)

# Evaluate the model
classification_report_output = classification_report(y_test, y_pred, target_names=type_encoder.classes_)

classification_report_output


'              precision    recall  f1-score   support\n\n      Benign       0.99      0.98      0.99       505\n   Malicious       0.99      1.00      0.99      1010\n\n    accuracy                           0.99      1515\n   macro avg       0.99      0.99      0.99      1515\nweighted avg       0.99      0.99      0.99      1515\n'

In [34]:
X

Unnamed: 0,url_length,num_special_chars,has_ip,num_subdomains,top_level_domain_encoded,transaction_within_valid_period
0,71,7,False,1,130,0
1,67,12,True,1,555,0
2,87,8,False,1,130,0
3,153,16,False,1,366,0
4,113,15,False,1,589,0
...,...,...,...,...,...,...
7569,25,4,False,1,130,1
7570,22,4,False,1,130,1
7571,21,4,False,1,130,1
7572,21,4,False,1,332,1


In [35]:
dynamic_dataset_updated.head()

Unnamed: 0.1,Unnamed: 0,URL,Type,url_length,num_special_chars,has_ip,num_subdomains,top_level_domain,num_uncommon_chars,has_at_symbol,uses_https,has_suspicious_hyphens,domain_length,created_at,expires_at,transaction_at,Type_encoded,top_level_domain_encoded,transaction_within_valid_period
0,0,http://www.marketingbyinternet.com/mo/e56508df...,Malicious,71,7,False,1,com,0,False,False,False,27,2024-12-13 18:32:13.493908,2024-12-13 21:32:13.493908,2024-12-14 00:32:13.493908,1,130,0
1,1,http://www.824555.com/app/member/SportOption.p...,Malicious,67,12,True,1,php?uid=guest&langx=gb,4,False,False,False,14,2024-11-21 23:32:13.493908,2024-11-22 01:32:13.493908,2024-11-22 02:32:13.493908,1,555,0
2,2,https://docs.google.com/spreadsheet/viewform?f...,Malicious,87,8,False,1,com,2,False,True,False,15,2024-12-10 06:32:13.493908,2024-12-10 08:32:13.493908,2024-12-10 05:32:13.493908,1,130,0
3,3,http://www.martin-busker.de/administrator/help...,Malicious,153,16,False,1,php,0,False,False,True,20,2024-11-21 17:32:13.493908,2024-11-21 18:32:13.493908,2024-11-21 16:32:13.493908,1,366,0
4,4,http://www.bimabn.com/1-configurazione-support...,Malicious,113,15,False,1,store-contatta,0,False,False,False,14,2024-12-13 19:32:13.493908,2024-12-13 20:32:13.493908,2024-12-13 18:32:13.493908,1,589,0


In [36]:
import joblib

joblib.dump(rf_model, 'rfmodel_dynamic.joblib')
print("Model saved as 'rfmodel_dunamic.joblib'")

Model saved as 'rfmodel_dunamic.joblib'


In [37]:
joblib.dump(type_encoder, 'type_encoder_dynamic.joblib')
print("Type encoder saved as 'type_encoder_dynamic.joblib'")
joblib.dump(top_level_domain_encoder, 'top_level_domain_encoder_dynamic.joblib')
print("Top level domain encoder saved as 'top_level_domain_encoder_dynamic.joblib'")

Type encoder saved as 'type_encoder_dynamic.joblib'
Top level domain encoder saved as 'top_level_domain_encoder_dynamic.joblib'


In [38]:
import pandas as pd
import joblib
from datetime import datetime

def extract_features(df, url_column):
    df['url_length'] = df[url_column].apply(len)
    df['num_special_chars'] = df[url_column].apply(lambda x: sum(1 for char in x if char in ['/', '?', '&', '=', '-', '_', '%', '.']))
    df['has_ip'] = df[url_column].apply(lambda x: any(part.isdigit() for part in x.split('.')))
    df['num_subdomains'] = df[url_column].apply(lambda x: len(x.split('.')) - 2 if 'http' not in x else len(x.split('/')[2].split('.')) - 2)
    df['top_level_domain'] = df[url_column].apply(lambda x: x.split('.')[-1].split('/')[0])
    return df


def extract_lexical_features(df, url_column):
    import re
    
    # Check for uncommon characters
    uncommon_chars = re.compile(r'[^a-zA-Z0-9/:.\-_]')
    df['num_uncommon_chars'] = df[url_column].apply(lambda x: len(uncommon_chars.findall(x)))

    # Check if URL contains "@" symbol
    df['has_at_symbol'] = df[url_column].apply(lambda x: '@' in x)

    # Check if URL contains "http" or "https"
    df['uses_https'] = df[url_column].apply(lambda x: 'https' in x.lower())

    # Check for suspicious patterns like '-' in the domain
    df['has_suspicious_hyphens'] = df[url_column].apply(lambda x: '-' in x.split('/')[2])

    # Check the length of the domain
    df['domain_length'] = df[url_column].apply(lambda x: len(x.split('/')[2]) if 'http' in x else len(x.split('.')[0]))

    return df


def preprocess_url(url):
    """
    Preprocess a single URL to extract features.
    """
    # Define a helper DataFrame for the URL
    df = pd.DataFrame([{'URL': url}])
    
    # Extract features
    df = extract_features(df, 'URL')
    df = extract_lexical_features(df, 'URL')
    
    return df


def preprocess_timestamps(created_at, expires_at, transaction_at):
    """
    Preprocess timestamps and add related features.
    """
    # Create a helper DataFrame for timestamps
    df = pd.DataFrame([{
        'created_at': pd.to_datetime(created_at) if created_at else None,
        'expires_at': pd.to_datetime(expires_at) if expires_at else None,
        'transaction_at': pd.to_datetime(transaction_at) if transaction_at else None,
    }])
    
    # Add new timestamp-based features
    if created_at and expires_at and transaction_at:
        df['transaction_within_valid_period'] = (
            (df['created_at'] <= df['transaction_at']) & (df['transaction_at'] <= df['expires_at'])
        ).astype(int)
    else:
        df['transaction_within_valid_period'] = 0  # Default to invalid if timestamps are missing
    
    return df


def qr_code_pipeline(url, created_at=None, expires_at=None, transaction_at=None):
    """
    Pipeline to classify a URL using pretrained models for static and dynamic QR codes.
    """
    # Preprocess the URL
    url_features = preprocess_url(url)
    
    # Check if timestamps exist (Dynamic Model)
    if created_at and expires_at and transaction_at:
        # Preprocess timestamps
        timestamp_features = preprocess_timestamps(created_at, expires_at, transaction_at)
        
        # Combine URL and timestamp features
        combined_features = pd.concat([url_features, timestamp_features], axis=1)
        
        # Load the dynamic model and encoders
        dynamic_model = joblib.load('rfmodel_dynamic.joblib')
        type_encoder = joblib.load('type_encoder_dynamic.joblib')
        top_level_domain_encoder = joblib.load('top_level_domain_encoder_dynamic.joblib')
        
        # Encode categorical features
        combined_features['top_level_domain_encoded'] = top_level_domain_encoder.transform(
            combined_features['top_level_domain']
        )
        
        # Select features for prediction
        dynamic_features = [
            'url_length', 'num_special_chars', 'has_ip', 'num_subdomains',
            'top_level_domain_encoded', 'transaction_within_valid_period'
        ]
        prediction = dynamic_model.predict(combined_features[dynamic_features])
    
    else:
        # Use Static Model
        # Load the static model and encoders
        static_model = joblib.load('rfmodel_static.joblib')
        type_encoder = joblib.load('type_encoder_static.joblib')
        top_level_domain_encoder = joblib.load('top_level_domain_encoder_static.joblib')
        
        # Encode categorical features
        url_features['top_level_domain_encoded'] = top_level_domain_encoder.transform(
            url_features['top_level_domain']
        )
        
        # Select features for prediction
        static_features = [
            'url_length', 'num_special_chars', 'has_ip', 'num_subdomains',
            'top_level_domain_encoded', 'num_uncommon_chars', 'has_at_symbol',
            'uses_https', 'has_suspicious_hyphens', 'domain_length'
        ]
        prediction = static_model.predict(url_features[static_features])
    
    # Decode the prediction
    prediction_label = type_encoder.inverse_transform(prediction)[0]
    
    # Return the result
    return {
        'URL': url,
        'Prediction': prediction_label,
        'Created_At': created_at,
        'Expires_At': expires_at,
        'Transaction_At': transaction_at
    }


# Example usage:
# Input: URL with optional timestamps
result = qr_code_pipeline(
    url='http://example.com/suspicious',
    created_at='2023-12-15 12:00:00',
    expires_at='2023-12-15 17:00:00',
    transaction_at='2023-12-15 18:00:00'
)
print(result)


{'URL': 'http://example.com/suspicious', 'Prediction': 'Malicious', 'Created_At': '2023-12-15 12:00:00', 'Expires_At': '2023-12-15 17:00:00', 'Transaction_At': '2023-12-15 18:00:00'}


In [None]:
result = qr_code_pipeline(
    url='http://example.com/suspicious',
    created_at='2023-12-15 12:00:00',
    expires_at='2023-12-15 17:00:00',
    transaction_at='2023-12-15 18:00:00'
)
print(result)
