# Malicious URL Pipeline

In [0]:
#Install Packages
%pip install tldextract python-whois

from urllib.parse import urlparse
import tldextract
import whois
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, BooleanType
import pandas as pd

In [0]:
# Secrets to access storage
processed_sas_token = dbutils.secrets.get(scope="trecombs", key="ProcessedSASToken")
sas_url = dbutils.secrets.get(scope="trecombs", key="sasurlvalue")
# Define storage account + container
storage_account = "formula1dl122334"
container_name = "processed"

# -----------------------------
# 3️⃣ Configure Spark with SAS token
# -----------------------------
spark.conf.set(
    f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net",
    processed_sas_token
)
spark.conf.set(
    "fs.azure.account.key.formula1dl122334.dfs.core.windows.net",
    dbutils.secrets.get(scope="trecombs", key="azurestoragekey")
)

In [0]:
# Load CSV into Spark using pandas
sas_url = dbutils.secrets.get(scope="trecombs", key="sas_url")
pdf = pd.read_csv("sas_url")

# Convert Pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(pdf)

# Helpers for URL parsing
def extract_domain(url: str) -> str:
    if not url:
        return None
    try:
        parsed = urlparse(url if "://" in url else f"http://{url}")
        host = parsed.netloc if parsed.netloc else parsed.path
        ext = tldextract.extract(host)
        if ext.domain and ext.suffix:
            return f"{ext.domain}.{ext.suffix}".lower()
        elif host:
            return host.lower()
        return None
    except Exception:
        return None

def extract_tld(url: str) -> str:
    if not url:
        return None
    try:
        parsed = urlparse(url if "://" in url else f"http://{url}")
        host = parsed.netloc if parsed.netloc else parsed.path
        ext = tldextract.extract(host)
        return ext.suffix.lower() if ext.suffix else None
    except Exception:
        return None

def whois_owner(domain: str) -> str:
    if not domain:
        return None
    try:
        w = whois.whois(domain)
        for key in ["org", "organization", "registrant_name", "name", "registrant_organization"]:
            val = w.get(key)
            if isinstance(val, list):
                val = val[0] if val else None
            if val:
                return str(val)
        registrar = w.get("registrar")
        if registrar:
            return str(registrar)
    except Exception:
        pass
    return None

def count_e_in_domain(domain: str) -> int:
    return domain.count("e") if domain else 0

def has_A_and_T(domain: str) -> bool:
    if not domain:
        return False
    d = domain.upper()
    return ("A" in d) and ("T" in d)

# UDFs
extract_domain_udf = F.udf(extract_domain, StringType())
extract_tld_udf    = F.udf(extract_tld, StringType())
owner_udf          = F.udf(whois_owner, StringType())
count_e_udf        = F.udf(count_e_in_domain, IntegerType())
has_AT_udf         = F.udf(has_A_and_T, BooleanType())


# Transform Data
enriched = (df
    .withColumn("domain", extract_domain_udf(F.col("url")))
    .withColumn("tld", extract_tld_udf(F.col("url")))
    .withColumn("owner", owner_udf(F.col("domain")))        
    .withColumn("e_count", count_e_udf(F.col("domain")))
    .withColumn("has_A_and_T", has_AT_udf(F.col("domain")))
)

display(enriched.limit(10))

# Count malware/phishing with A and T

malware_phishing_AT = (enriched
    .filter( (F.upper(F.col("type")).isin("MALWARE", "PHISHING")) & (F.col("has_A_and_T") == True) )
    .count()
)

print(f"Records with A and T in domain that are malware or phishing: {malware_phishing_AT}")

table_name = "malicious_urls_project_catalog.enriched_data.malicious_urls_enriched"

# Drop/create table using CREATE OR REPLACE TABLE
enriched.createOrReplaceTempView("temp_enriched")  # create a temp view

spark.sql(f"""
CREATE OR REPLACE TABLE {table_name}
USING DELTA
AS
SELECT * FROM temp_enriched
""")


The following query displays all records in the malicious_urls_enriched table

In [0]:
%sql
select * from malicious_urls_project_catalog.enriched_data.malicious_urls_enriched


The following query calculates how many domains exist per type and what percentage each type represents of the total dataset.

In [0]:
%sql
SELECT 
    type,
    COUNT(DISTINCT domain) AS domain_count,
    ROUND(
        COUNT(DISTINCT domain) * 100.0 / SUM(COUNT(DISTINCT domain)) OVER (), 
        2
    ) AS percentage_of_total
FROM malicious_urls_project_catalog.enriched_data.malicious_urls_enriched
GROUP BY type
ORDER BY domain_count DESC;


The following query assigns a severity rank to each URL based on its type and orders the results by severity.

In [0]:
%sql
SELECT
    url,
    type,
    domain,
    CASE 
        WHEN type = 'malware' THEN 'High'
        WHEN type = 'defacement' THEN 'Medium High'
        WHEN type = 'phishing' THEN 'Medium'
        WHEN type = 'benign' THEN 'Low'
    END AS severity_rank
FROM malicious_urls_project_catalog.enriched_data.malicious_urls_enriched
where type is not null and domain is not null
ORDER BY 
    CASE 
        WHEN type = 'malware' THEN 1
        WHEN type = 'defacement' THEN 2
        WHEN type = 'phishing' THEN 3
        WHEN type = 'benign' THEN 4
    END;



