In [1]:
import pandas
import os
import datetime
import json
import binascii
import hashlib

In [4]:
def find_active_scan_files(active_scan_path):
    files_names = []
    file_dirs = os.listdir(active_scan_path)
    print(len(file_dirs))
    for folder in file_dirs:
        if 'b_' not in folder[:2]:
            continue
        files = os.listdir(active_scan_path + "/" + folder)
        for filename in files:
            if 'certs.txt' == filename:
                filepath = active_scan_path + folder + "/" + filename
                files_names.append(filepath)
    return files_names


def get_ccadb_hashes(filePath="../datasets/tls_scans/ccadb/cert_hashes_ccadb.txt"):
    ccadb_hashes = dict()
    with open(filePath, 'rt') as f:
        for line in f:
            ccadb_hashes[line.rstrip()] = None
    return ccadb_hashes


def hash_cert(pem):
    pem_to_hash = pem.replace('\n','').replace('-----BEGIN CERTIFICATE-----','').replace('-----END CERTIFICATE-----','')
    pem_to_hash += "=" * ((4 - len(pem_to_hash) % 4) % 4)
    pem_decode = binascii.a2b_base64(pem_to_hash)
    hash_object = hashlib.sha1(pem_decode)
    return hash_object.hexdigest(), pem_to_hash


def parse_date(value):
    # Convert date to Unix timestamp
    if 'T' in value:
        return datetime.datetime.strptime(value.split('T')[0], '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc).timestamp()
    else:
        return datetime.datetime.strptime(value.split(' ')[0], '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc).timestamp()


def is_expired(not_after, not_before, snapshot_date_timestamp):
    # Check if certificate is expired or not with a one-week margin
    week = 60*60*24*7
    if snapshot_date_timestamp <= (not_after + week) and (not_before - week) <= snapshot_date_timestamp:
        return False
    else:
        return True


def validate_certificate(cert_list, ccadb_hashes, snapshot_timestamp):
    # Check if certificate is present in CCADB
    if 'pem' in cert_list[-1]: # Privacy Enhanced Mail format
        certHashed, rawPem = hash_cert(cert_list[-1]['pem'])
        if certHashed not in ccadb_hashes:
            return None
    
    ee_cert = None
    for cert in cert_list:
        
        # Return True if certificate is still valid within the timeframe, False if it false outside of the scope of a week
        if 'not_before' in cert and 'not_after' in cert:
            if is_expired(parse_date(cert['not_after']), parse_date(cert['not_before']), snapshot_timestamp):
                return None

        if 'basic_constraints' in cert:
            # Only keep non-CA certifcates
            if 'is_ca' in cert['basic_constraints']:
                if cert['basic_constraints']['is_ca'] == False:
                    if 'dns_names' in cert:
                        if len(cert['dns_names']) > 0:
                            # Remove self-signed certificates
                            if 'is_self_signed' in cert:
                                if cert['is_self_signed'] == False:
                                    ee_cert = cert
    return ee_cert


def active_scan_preprocessing(file_names, ccadb_hashes, snapshot_timestamp, result_path):
    with open(result_path + "ee_certs.txt", 'wt') as write_file:
        for i, file_name in enumerate(file_names):
            i += 1
            # Keep track of processing files
            print(f"Processing file {i} out of {len(file_names)}:\t {file_name}")
            
            with open(file_name, "rt") as current_file:
                for line in current_file:
                    try:
                        data = json.loads(line.strip())
                    except:
                        continue
                    
                    for ip in data:
                        # Check if certificate is present
                        if "certificates" in data[ip]:
                            # Validate certificate
                            ee_cert = validate_certificate(data[ip]['certificates'], ccadb_hashes, snapshot_timestamp)
                            if ee_cert is not None:
                                json_format = json.dumps({ip: ee_cert})
                                write_file.write(f"{json_format}\n")
                        
                    
            
            
    

In [5]:
def main(datatype="active"):
    result_path = "./results/"
    
    ccadb_path = "../datasets/tls_scans/ccadb/pem_cert_hashes_ccadb.txt"
    ccadb_hashes = get_ccadb_hashes(ccadb_path)
    
    # Specify timestamp and convert it to utc format
    datestring = "21-11-2019" # Enter date of scan
    snapshot_timestamp  = datetime.datetime.strptime(datestring, '%d-%m-%Y').replace(tzinfo=datetime.timezone.utc).timestamp()
    
    if datatype == "active":
        active_path = "../datasets/tls_scans/active/"
        
        # Find all active file names
        files_names = find_active_scan_files(active_path)
        
        # Validate certificates present in the active scan
        active_scan_preprocessing(files_names, ccadb_hashes, snapshot_timestamp, result_path)
    
    
main()

4000
Processing file 1 out of 4000:	 ../datasets/tls_scans/active/b_1/certs.txt
Processing file 2 out of 4000:	 ../datasets/tls_scans/active/b_10/certs.txt
Processing file 3 out of 4000:	 ../datasets/tls_scans/active/b_11/certs.txt
Processing file 4 out of 4000:	 ../datasets/tls_scans/active/b_12/certs.txt
Processing file 5 out of 4000:	 ../datasets/tls_scans/active/b_13/certs.txt
Processing file 6 out of 4000:	 ../datasets/tls_scans/active/b_100/certs.txt
Processing file 7 out of 4000:	 ../datasets/tls_scans/active/b_101/certs.txt
Processing file 8 out of 4000:	 ../datasets/tls_scans/active/b_102/certs.txt
Processing file 9 out of 4000:	 ../datasets/tls_scans/active/b_103/certs.txt
Processing file 10 out of 4000:	 ../datasets/tls_scans/active/b_104/certs.txt
Processing file 11 out of 4000:	 ../datasets/tls_scans/active/b_105/certs.txt
Processing file 12 out of 4000:	 ../datasets/tls_scans/active/b_106/certs.txt
Processing file 13 out of 4000:	 ../datasets/tls_scans/active/b_107/certs.