In [None]:
!pip install python-whois
!pip install tld

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Bucketizer
from glob import glob

import pyspark.sql.functions as F
import tld

# Configurations

In [None]:
conf = SparkConf().setAppName("App").setMaster("local[*]")

# Habilitar otimizações e configurações adicionais
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.sql.repl.eagerEval.enabled", "true")
conf.set("spark.sql.repl.eagerEval.truncate", 100)
conf.set("spark.sql.execution.arrow.pyspark.ignore_timezone", "true")

# AWS S3 CONNECTION
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_ENDPOINT_URL = "https://s3.bhs.io.cloud.ovh.net"
AWS_REGION = "bhs"

conf.set("spark.driver.memory", "30g")
conf.set("spark.executor.memory", "30g")
conf.set("spark.executor.pyspark.memory", "30g")
conf.set("spark.memory.offHeap.enabled", "true")
conf.set("spark.memory.offHeap.size", "30g")
conf.set("spark.sql.parquet.enableVectorizedReader", "false")
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.sql.repl.eagerEval.enabled", "true")
conf.set("spark.sql.repl.eagerEval.truncate", 100)
# conf.set("spark.jars", "/home/shared/drivers/postgresql-42.7.2.jar")
conf.set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
conf.set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
conf.set("spark.hadoop.fs.s3a.endpoint", AWS_ENDPOINT_URL)
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
# conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
# conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
# conf.set("spark.executor.extraJavaOptions", "-Djavax.net.debug=all")
# conf.set("spark.driver.extraJavaOptions", "-Djavax.net.debug=all")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print(spark.sparkContext.getConf().get("spark.jars"))
print(spark._jsc.sc().listJars())

print("Spark session configurada com sucesso!")

# Functions

In [None]:
def validate(site: str) -> bool:
    res = tld.get_tld(site, fix_protocol=True, fail_silently=True)
    return res is not None

def extract_domain(site: str, raise_if_invalid=True) -> str:
    if not validate(site):
        if raise_if_invalid:
            raise ValueError("Invalid site")
        return None
    res = tld.get_tld(site, fix_protocol=True, as_object=True)
    return res.fld

def partition_number(x):
    return int(x.split("_")[-1].replace(".parquet", ""))

def get_interval_range(probability):
    probability = float(probability)

    if probability < 0.1:
        return '(0.0, 0.1)'
    elif probability >= 0.1 and probability < 0.2:
        return '(0.1, 0.2)'
    elif probability >= 0.2 and probability < 0.3:
        return '(0.2, 0.3)'
    elif probability >= 0.3 and probability < 0.4:
        return '(0.3, 0.4)'
    elif probability >= 0.4 and probability < 0.5:
        return '(0.4, 0.5)'
    elif probability >= 0.5 and probability < 0.6:
        return '(0.5, 0.6)'
    elif probability >= 0.6 and probability < 0.7:
        return '(0.6, 0.7)'
    elif probability >= 0.7 and probability < 0.8:
        return '(0.7, 0.8)'
    elif probability >= 0.8 and probability < 0.9:
        return '(0.8, 0.9)'
    else:
        return '(0.9, 1.0)'

# Analysis

## Domains

In [None]:
df = spark.read.parquet('/media/greca/HD/Driva/hosts.parquet')
df.show()

In [None]:
df.count()

In [None]:
df.select("url").distinct().count()

In [None]:
repetitive_urls = df.groupBy("url").count() \
                       .sort(col("count").desc())
repetitive_urls = repetitive_urls.filter(col("count") > 1)
repetitive_urls.count()

In [None]:
non_repetitive_urls = df.groupBy("url").count() \
                       .sort(col("count").desc())
non_repetitive_urls = non_repetitive_urls.filter(col("count") == 1)
non_repetitive_urls.count()

In [None]:
df.select("host").distinct().count()

In [None]:
repetitive_domains = df.groupBy("host").count() \
                       .sort(col("count").desc())
repetitive_domains = repetitive_domains.filter(col("count") > 1)
repetitive_domains.count()

In [None]:
non_repetitive_domains = df.groupBy("host").count() \
                       .sort(col("count").desc())
non_repetitive_domains = non_repetitive_domains.filter(col("count") == 1)
non_repetitive_domains.count()

## Probabilities

In [None]:
THRESHOLD = 0.6

files = glob("/media/greca/HD/Driva/smaller_ecommerce_whois_without_html/*.parquet")
files = sorted(files, key=partition_number)
df = spark.read.parquet(*files)

udf_mapping = udf(lambda z: get_interval_range(z), StringType()) 

higher_than_threshold = df.filter(col("probability") >= THRESHOLD).count()
lower_than_threshold = df.filter(col("probability") < THRESHOLD).count()

df = df.withColumn("interval", udf_mapping(df.probability))

total = df.groupBy("interval").count().agg(F.sum("count")).collect()[0][0]
df = df.groupBy("interval").count()
df = df.withColumn("percent", F.round(df['count']/total, 4))
df = df.sort(col("percent").desc())
df.show()

In [None]:
higher_than_threshold, lower_than_threshold

In [None]:
total = higher_than_threshold + lower_than_threshold

higher_than_threshold/total, lower_than_threshold/total

In [None]:
files = glob("/media/greca/HD/Driva/smaller_ecommerce_whois_without_html/*.parquet")
files = sorted(files, key=partition_number)
df = spark.read.parquet(*files)

In [None]:
def validate(site: str) -> bool:
    res = tld.get_tld(site, fix_protocol=True, fail_silently=True)
    return res is not None

def extract_tld(site: str, raise_if_invalid=True) -> str:
    if not validate(site):
        if raise_if_invalid:
            raise ValueError("Invalid site")
        return None
    res = tld.get_tld(site, fix_protocol=True, as_object=True)
    return res.tld

def extract_domain(site: str, raise_if_invalid=True) -> str:
    if not validate(site):
        if raise_if_invalid:
            raise ValueError("Invalid site")
        return None
    res = tld.get_tld(site, fix_protocol=True, as_object=True)
    return res.fld

fld_domain = udf(lambda z: extract_domain(z), StringType())
# tld_domain = udf(lambda z: extract_tld(z), StringType())

df = df.withColumn("fld_domain", fld_domain(df.host))
# df = df.withColumn("tld_domain", tld_domain(df.host))
df.filter(col("host") != col("fld_domain")).count()

## E-commerce Table vs Model Probabilities Comparison

In [None]:
ecommerce_table_df = spark.read.format("csv").option("header", "true").load("/media/greca/HD/Driva/ecommerces_202505050918.csv")
ecommerce_table_df.count()

In [None]:
ecommerce_table_df.printSchema()

In [None]:
ecommerce_table_df.select("dominio").distinct().count()

In [None]:
ecommerce_table_df.select("host").distinct().count()

In [None]:
ecommerce_table_df.where(col("probabilidade").isNull()).count()

In [None]:
not_null_ecommerce_table_df = ecommerce_table_df.where(col("probabilidade").isNotNull())

In [None]:
udf_mapping = udf(lambda z: get_interval_range(z), StringType()) 

not_null_ecommerce_table_df = not_null_ecommerce_table_df.withColumn(
    "interval",
    udf_mapping(not_null_ecommerce_table_df.probabilidade)
)

total = not_null_ecommerce_table_df.groupBy("interval").count().agg(F.sum("count")).collect()[0][0]
not_null_ecommerce_table_df = not_null_ecommerce_table_df.groupBy("interval").count()
not_null_ecommerce_table_df = not_null_ecommerce_table_df.withColumn("percent", F.round(not_null_ecommerce_table_df['count']/total, 4))
not_null_ecommerce_table_df = not_null_ecommerce_table_df.sort(col("percent").desc())
not_null_ecommerce_table_df.show()

### Domains

In [None]:
df = spark.read.parquet('/media/greca/HD/Driva/hosts.parquet')
unique_domains_whois = df.select("host").distinct().collect()
unique_domains_whois = [udw.host for udw in unique_domains_whois]

unique_domains_ecomm_table = ecommerce_table_df.select("dominio").distinct().collect()
unique_domains_ecomm_table = [udet.dominio for udet in unique_domains_ecomm_table]

unique_hosts_ecomm_table = ecommerce_table_df.select("host").distinct().collect()
unique_hosts_ecomm_table = [uhet.host for uhet in unique_hosts_ecomm_table]

unique_hosts_ecomm_table.extend(unique_domains_ecomm_table)

union = list(set(set(unique_domains_whois) | set(unique_hosts_ecomm_table)))
intersection = list(set(set(unique_domains_whois) & set(unique_hosts_ecomm_table)))
diff_whois_table = list(set(set(unique_domains_whois) - set(unique_hosts_ecomm_table)))
diff_table_whois = list(set(set(unique_hosts_ecomm_table) - set(unique_domains_whois)))

print(f"Domains available in both sets: {len(intersection)}")
print(f"Domains available only in whois set: {len(diff_whois_table)}")
print(f"Domains available only in ecomm table set: {len(diff_table_whois)}")
print(f"All domains in both sets: {len(union)}")

In [None]:
files = glob("/media/greca/HD/Driva/smaller_ecommerce_whois_without_html/*.parquet")
files = sorted(files, key=partition_number)
df = spark.read.parquet(*files)

# ecommerce_table_df = ecommerce_table_df.withColumnRenamed("host", "url")
# ecommerce_table_df = ecommerce_table_df.withColumnRenamed("dominio", "host")

df_only_whois = df.join(
    ecommerce_table_df,
    df.host ==  ecommerce_table_df.host,
    "leftanti"
)

df_only_whois2 = df.join(
    ecommerce_table_df,
    df.host ==  ecommerce_table_df.dominio,
    "leftanti"
)

df_only_whois = df_only_whois.union(df_only_whois2)
df_only_whois = df_only_whois.dropDuplicates(subset=["host"])
df_only_whois.count()

In [None]:
df_only_whois = df_only_whois.withColumn(
    "interval",
    udf_mapping(df_only_whois.probability)
)

total = df_only_whois.groupBy("interval").count().agg(F.sum("count")).collect()[0][0]
df_only_whois = df_only_whois.groupBy("interval").count()
df_only_whois = df_only_whois.withColumn("percent", F.round(df_only_whois['count']/total, 4))
df_only_whois = df_only_whois.sort(col("percent").desc())
df_only_whois.show()

In [None]:
ecommerce_table_df = ecommerce_table_df.withColumnRenamed("host", "host_table_ecomm")

all_df = df.join(
    ecommerce_table_df,
    df.host ==  ecommerce_table_df.host_table_ecomm,
    "inner"
)

all_df2 = df.join(
    ecommerce_table_df,
    df.host ==  ecommerce_table_df.dominio,
    "inner"
)

all_df = all_df.union(all_df2)
# all_df = df.join(ecommerce_table_df, on="host", how="inner")
all_df = all_df.dropDuplicates(subset=["host"])
all_df = all_df.dropDuplicates(subset=["dominio"])
all_df = all_df.select("host", "probabilidade", "probability")
all_df = all_df.withColumnRenamed("probabilidade", "old_model_probability")
all_df = all_df.withColumnRenamed("probability", "new_model_probability")
all_df = all_df.withColumn("difference (%)", F.round((col("new_model_probability") - col("old_model_probability"))/col("old_model_probability"), 3) * 100)
all_df.count()

In [None]:
BINS = [-float('inf'), -100, -75, -50, -25, 0, 25, 50, 75, 100, float('inf')]

df_final = Bucketizer(
    splits=BINS,
    inputCol="difference (%)",
    outputCol="bin"
).transform(all_df)

intervals = []

for i in range(0, len(BINS)-1):
    intervals.append(f"({BINS[i]}, {BINS[i+1]}]")

mapping = spark.sparkContext.broadcast(intervals)

def get_bins(values):
    def f(x):
        if x is None:
            return values[int(0)]
        else:
            return values[int(x)]
    return udf(f)

df_final = df_final.withColumn("interval", get_bins(mapping.value)(col("bin")))
df_final = df_final.drop("bin")
df_final.show()

In [None]:
df_final = df_final.groupBy("interval").count()
total = df_final.agg(F.sum("count")).collect()[0][0]
df_final = df_final.withColumn("percent", F.round(df_final['count']/total, 4))
df_final = df_final.sort(col("percent").desc())
df_final.show()

In [None]:
all_df_not_null = all_df.where(col("old_model_probability").isNotNull())
all_df_not_null = all_df_not_null.withColumn("old_model_prediction", F.when(col("old_model_probability") >= 0.5, True).otherwise(False))
all_df_not_null = all_df_not_null.withColumn("new_model_prediction", F.when(col("new_model_probability") >= 0.6, True).otherwise(False))
all_df_not_null.show()

In [None]:
all_df_not_null.count()

In [None]:
predicted_as_true_before = all_df_not_null.filter((col("old_model_prediction") == True) & (col("new_model_prediction") == False)).count()
predicted_as_false_before = all_df_not_null.filter((col("old_model_prediction") == False) & (col("new_model_prediction") == True)).count()
same_prediction = all_df_not_null.filter(col("old_model_prediction") == col("new_model_prediction")).count()

print(f"Predicted as True before and now it's False: {predicted_as_true_before}")
print(f"Predicted as False before and now it's True: {predicted_as_false_before}")
print(f"Prediction did not change: {same_prediction}")

In [None]:
spark.stop()