In [None]:
%matplotlib inline
import os
from dateutil import rrule
from datetime import datetime, timedelta, date
import time
import random
import itertools
import collections
import IPy
import base64
import numpy as np
import pandas as pd
import subprocess
import re
import cryptography
import pprint
from cryptography.hazmat._oid import ObjectIdentifier
import pyarrow as pa
os.environ["ARROW_LIBHDFS_DIR"] = "/usr/local/hadoop/lib/native"

# Find Spark
import findspark
findspark.init()

# PySpark imports
import pyspark
import pyspark.sql.functions as psf
import pyspark.sql.types as pst


# A helper to kinit, keytab-based
def kinit_helper(principal, keytab):
    
    kinit_cmd = "kinit -p {} -k -t {} -l 7d -r 7d".format(principal, keytab)
    
    # Call subprocess to execute cmd
    process = subprocess.Popen(kinit_cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable="/bin/bash")
    output, error = process.communicate()
    
    stdout_str = error.decode("utf-8")
    if len(stdout_str) > 0:
        logger.info(stdout_str)
    
    stderr_str = error.decode("utf-8")
    if len(stderr_str) > 0:
        logger.error(stderr_str)
        
# A helper to get a (pyarrow) hdfs client
def hdfs_fs_helper(principal, host="default"):
    
    # Create a HDFS connection
    return pa.hdfs.connect(host=u"{}".format(host), port=0, user=u"{}".format(principal), kerb_ticket=u"/tmp/krb5cc_{}".format(os.getuid()), driver=u"libhdfs")

# Helper to filter out LE certs
def filter_LE(obj):
    if not obj:
        return False
    return obj.asDict().get("CN") == "Let's Encrypt Authority X3"

# Get count of items in array
slen = psf.udf(lambda s: len(s), pst.IntegerType())


filter_LE_udf = psf.udf(filter_LE, pst.BooleanType())

# setup
# Set up some variables to set up the connection. Some information is redacted for security and privacy.
APP_NAME = "LE-analysis"

EXTERNAL_DRIVER_IP = "" # Your external IP address, for callback connections to the Spark driver
OI_KRB_PRINCIPAL = ""        # Your assigned username, i.e., Kerberos principal

OI_KRB_KEYTAB    = os.path.join(     # Your Kerberos keytab
    os.path.expanduser("~"),
    "{}.keytab".format(OI_KRB_PRINCIPAL)
)

## DO NOT CHANGE THE CONFIGURATION BELOW UNLESS YOU KNOW WHAT YOU ARE DOING
spark_conf = pyspark.SparkConf().setAppName(APP_NAME
).setMaster("yarn").set("spark.scheduler.pool", "root.users.{}".format(OI_KRB_PRINCIPAL)
).set("spark.submit.deployMode", "client"
).set("spark.authenticate", "true"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.network.crypto.enabled", "true"
).set("spark.driver.host", EXTERNAL_DRIVER_IP
).set("spark.driver.bindAddress", "0.0.0.0"
).set("spark.driver.port", "33007").set("spark.blockManager.port", "33023").set("spark.ui.port", "33039"
).set("spark.driver.cores","2").set("spark.driver.memory","4G"
).set("spark.executor.cores", "5").set("spark.executor.memory", "12G").set("spark.executor.memoryOverhead", "4G"
).set("spark.dynamicAllocation.enabled", "true").set("spark.shuffle.service.enabled", "true")

# The locations where the raw parquet data is stored. To be loaded later
OI_CTLOGS_BASE_19 = "/user/openintel/ct-logs/type=warehouse/name=LetsEncryptOak-2019"
OI_CTLOGS_BASE_20 = "/user/openintel/ct-logs/type=warehouse/name=LetsEncryptOak-2020"


# Stop/clear context

In [None]:
sc.stop()

In [None]:
# clear cache

sqlc.clearCache()

# Start context

In [None]:
kinit_helper(OI_KRB_PRINCIPAL, OI_KRB_KEYTAB)

# SparkContext
sc = pyspark.SparkContext(conf=spark_conf)

# SQLContext
sqlc = pyspark.SQLContext(sc)

In [None]:
# Setup Context
ct_logs19_df = sqlc.read.format("parquet").load(OI_CTLOGS_BASE_19)
ct_logs20_df = sqlc.read.format("parquet").load(OI_CTLOGS_BASE_20)
# Use union to combine both datasets.
ct_logs_df = ct_logs19_df.union(ct_logs20_df)
# Set up dataframe that is fitlered to only contain certificates issued by Let's Encrypt
ct_logs_LE_df = ct_logs_df.where(filter_LE_udf(psf.col("leaf_cert.issuer")))
# ct_logs_LE_df.count()

# Analysis

## Duplicate certificates
Further research required.
It shows that a load of the certificates found in the logs are duplicates. In the context that they cover the same domains and have the same not_before timestamp.
Up to 66 times even.
Further research must show why this is and further characteristics within this data. Noteworthy is that these numbers often are often even numbers.

In [None]:
# Get the total number of certificates (after filtering)
ct_logs_LE_df.count()

In [None]:
# How many of the certs are duplicates. Covering the same Common Names and having the same not_before timestamp.

ct_logs_LE_df.withColumn("not_before", psf.col("leaf_cert.not_before").cast(pst.DecimalType(12, 0))).groupBy("all_domains", "not_before").count().groupBy("count").count().sort(psf.desc("count")).show(1000, truncate=100)

## What proportion of certificates are being renewed

This looks into how often a certificate is being renewed. Since many certificates seem to be renewed after very short timeframes (minutes-hours) this will look at minimum intervals of 7 days.
This interval is constructed as follows. Get the first certificate for the domains and grab the timestamp. For every certificate after this check that it has been at least 7 days since the first one. When one is found reset the timestamp and repeat this process until all certificates have been checked.

Pseudo-code
```python
function filter(certificates):
    filtered_certificates = [certificates[0]] # list containing the first certificate which will get appended with all following certificates
    timestamp = certificates[0].timestamp
    for certificate in certificates[1:]: # for each certificate after the first one
        if certificate.timestamp + 7_days > timestamp:
            timestamp = certificate.timestamp
            filtered_certificates.append(certificate)
    return filtered_certificates
```

In [None]:
# Set up the udf to filter out certificates that do not have enough time between them

#month
interval = 60*60*24*30

def filter_duration(timestamps):
    timestamps.sort()
    result = [timestamps[0]]
    last = timestamps[0]
    for o in timestamps[1:]:
        if last+interval < o:
            result.append(o)
            last = o
    return result

filter_duration_udf = psf.udf(lambda y: filter_duration(y), pst.ArrayType(pst.FloatType()))

In [None]:
# Set up the dataframe for the filtered certificates. Make persistant since this is often re-used.
filtered_df = ct_logs_LE_df.groupBy('all_domains').agg(psf.collect_list("leaf_cert.not_before").alias("not_befores")).select("all_domains", filter_duration_udf(psf.col("not_befores")).alias("befores")).persist()

In [None]:
# Table to see how long certificates are being renewed for. Check the first and last date. Accuracy is limited accounted to the limited timeframe in relation to a domain renewal of minimum 1 year)

def first_tup(obj):
    first = min(obj)
    a = time.gmtime(first)
    return a.tm_year*100+a.tm_mon

def last_tup(obj):
    last = max(obj)
    a = time.gmtime(last)
    return a.tm_year*100+a.tm_mon

first_tup_udf = psf.udf(lambda x: first_tup(x), pst.IntegerType())

last_tup_udf = psf.udf(lambda x: last_tup(x), pst.IntegerType())

filtered_df.withColumn("counts", slen(psf.col("befores"))).withColumn("first_year_month", first_tup_udf(psf.col("befores"))).withColumn("last_year_month", last_tup_udf(psf.col("befores"))).groupBy("counts", "first_year_month", "last_year_month").count().sort(psf.desc("last_year_month")).sort(psf.desc("first_year_month")).sort(psf.desc("count")).show(1000, truncate=False)
# First column: Amount of renewals
# Second column: First timestamp for certificate
# Third column: Last timestamp for certificate
# Fourth column: Total amount of occurrences

In [None]:
# Set up udf to calculate the duration/interval between certificates.

def duration(obj):
    obj.sort()
    np.interval(obj)
    
    [abs(j-i) for i,j in zip(x, x[1:])]
    
duration_udf = psf.udf((lambda x: [abs(j-i) for i,j in zip(x, x[1:])] if len(x) > 1 else None), pst.ArrayType(pst.FloatType()))

# Set up the dataframe for the interval between certificates. Make persistant since this is often re-used.
interval_df = filtered_df.where(psf.size(psf.col("befores")) >= 2).withColumn("intervals", duration_udf(psf.col("befores"))).persist()

In [None]:
# Find overall duration across all certificates
exploded_df = interval_df.select("all_domains", psf.explode(psf.col("intervals")).alias("duration"))
# exploded_df.count()

In [None]:
# Get a table with all durations grouped together in amount of days (1 hour in seconds * 24 hours)
# Sorted by amount of occurrences
exploded_df.withColumn("duration", psf.round(psf.col("duration")/(3600*24))).groupBy("duration").count().sort(psf.desc("count")).show(1000, truncate=False)

In [None]:
# Get a table with all durations grouped together in amount of days (1 hour in seconds * 24 hours)
# Sorted by amount of days
exploded_df.withColumn("duration", psf.round(psf.col("duration")/(3600*24))).groupBy("duration").count().sort(psf.desc("duration")).show(1000, truncate=False)

# Generating graphs

In [None]:
# Make a pandas dataframe of the pyspark dataframe for local processing (generating the graph)
exploded_pdf = exploded_df.withColumn("Interval", psf.round(psf.col("duration")/(3600*24))).groupBy("Interval").count().sort(psf.asc("Interval")).where(psf.col("Interval")<400).toPandas()

In [None]:
# Generate the graph
a = exploded_pdf.plot(x='Interval', y='count', x_compat=True, logy=True, title="All certificates, log")

In [None]:
# Set up udf for getting the mean of the interval for each group of Common Names
array_median = psf.udf(lambda x: float(np.median(x)), pst.FloatType())
# Make a pandas dataframe for local processing
median_pdf = interval_df.withColumn("Median", psf.round(array_median(psf.col("intervals"))/(3600*24))).groupBy("Median").count().sort(psf.desc("Median")).where(psf.col("Median")<110).toPandas()


In [None]:
# Generate the graph
median_plot = median_pdf.plot(x="Median", y="count",  logy=True, title="Grouped median, log", x_compat=True)

In [None]:
# Set up udf for getting the average of the interval for each group of Common Names
array_average = psf.udf(lambda x: float(np.average(x)), pst.FloatType())
# Make a pandas dataframe for local processing
avg_pdf = interval_df.withColumn("Average", psf.round(array_average(psf.col("intervals"))/(3600*24))).groupBy("Average").count().sort(psf.desc("Average")).where(psf.col("Average")<110).toPandas()


In [None]:
# Generate the graph
avg_plot = avg_pdf.plot(x="Average", y="count",  logy=True, title="Grouped average, log", x_compat=True)