In [2]:
%run ./spark-instance.ipynb

SparkConf created
Started SparkSession
Spark version 3.3.1


In [3]:
from datetime import datetime

import pyspark.sql.functions as psf
from pyspark.sql.types import ArrayType, IntegerType

## Generate input for X.509 validator tool

#this was used for the example input (date of 2023-09-12)

hosts_df.select("id", "cert_id", "peer_certificates", "cert_valid") \
        .filter((psf.col("id") == 830) | (psf.col("id") == 159) | (psf.col("id") == 31)) \
        .show(truncate=False)
certs_df.filter(psf.col("id") == 166).show()
cert_chain_df.filter(psf.col("chain_complete") == 0).show(2)

In [7]:
def eval_list(my_list):
    try:
        if isinstance(my_list, str):
            return eval(my_list)
        else:
            return []
    except:
        return []

    
eval_udf = psf.udf(eval_list, ArrayType(IntegerType()))


def explode_cert_chain(cert_chain_df):
    cert_chain_df = cert_chain_df.withColumn("cert_chain_list", eval_udf(psf.col("cert_chain")))
    exploded_chain_df = cert_chain_df.select("id", "cert_chain_list", psf.explode_outer(cert_chain_df.cert_chain_list))
    #exploded_chain_df.show(5)
    return exploded_chain_df


def group_cert_chain(exploded_chain_df, certs_df):
    joined_certs_df = exploded_chain_df.withColumnRenamed("id", "chain_id").join(certs_df, (certs_df.id == exploded_chain_df.col), "inner")
    
    grouped_chain_df = joined_certs_df.groupBy('chain_id') \
                                      .agg(psf.collect_list('cert').alias("chain")) \
                                      .withColumnRenamed("chain_id", "id")
    #grouped_chain_df.show(5)
    #grouped_chain_df.printSchema()
    return grouped_chain_df


def write_to_tmp(grouped_chain_df, output):
    grouped_chain_df.coalesce(1).write.parquet(f"s3a://{output}")
    spark.read.parquet(f"s3a://{output}").count()

In [9]:
PORT_SCANDATE_MAP = {
    636: [
        datetime(2023, 9, 5),
        datetime(2023, 9, 12),
        datetime(2023, 9, 19),
        datetime(2023, 9, 26),
    ],
    389: [
        datetime(2023, 8, 30),
        datetime(2023, 9, 6),
        datetime(2023, 9, 13),
        datetime(2023, 9, 20),
        datetime(2023, 9, 27),
        datetime(2023, 10, 4),
    ]
}

S3_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=tls/result={result}/year={year}/month={month:02d}/day={day:02d}"
OUTPUT_DIR = f"luvizottocesarg-tmp/cert-validator-input"

for port, scan_dates in PORT_SCANDATE_MAP.items():
    for scan_date in scan_dates:

        timestamp = f"{scan_date.year}{scan_date.month:02}{scan_date.day:02}"
        output = f"{OUTPUT_DIR}/{timestamp}_{port}_cert-validator-input"

        certs_base_path = S3_PATH_FMT.format(port=port, result="certs", year=scan_date.year, month=scan_date.month, day=scan_date.day)
        cert_chain_base_path = S3_PATH_FMT.format(port=port, result="cert_chain", year=scan_date.year, month=scan_date.month, day=scan_date.day)

        certs_df = spark.read.option("header", "true") \
                             .option("multiline", "true") \
                             .option("wholeFile", "true") \
                             .option("inferSchema", "true") \
                             .option("basePath", f"s3a://{certs_base_path}") \
                             .csv(f"s3a://{certs_base_path}")


        cert_chain_df = spark.read.option("header", "true") \
                                  .option("inferSchema", "true") \
                                  .option("basePath", f"s3a://{cert_chain_base_path}") \
                                  .csv(f"s3a://{cert_chain_base_path}")

        exploded_chain_df = explode_cert_chain(cert_chain_df)
        grouped_chain_df = group_cert_chain(exploded_chain_df, certs_df)
        write_to_tmp(grouped_chain_df, output)

In [10]:
clean_spark()  # run by EOB

CLEANING SPARK INSTANCE...
