# RSA fingerprinting

In [None]:
#!python -m venv venv
#!source /home/tjaros/Development/python/310/git/algtest-pyprocess/pyprocess-venv/bin/activate.fish
#!python -m pip install -r ./requirements.txt
#!pip install pandas

In [None]:
#!git clone git@github.com:crocs-muni/RSABias.git
#!cp ./RSABias/roca.py .
#!cp ./RSABias/rsa_fingerprint.py .

In [None]:
from algtestprocess.modules.utilities.rsa_fingerprint import *

### Creation of the metadata.json

If you already have the `metadata.json` file you want to use, please skip this step

In [None]:
#!python ../process.py tpm metadata-update /home/tjaros/storage/research/tpm/

### Load metadata

In [None]:
import json

def load_metadata(metadata_path):
    try:
        metadata = {}
        with open(metadata_path, "r") as f:
            metadata = json.load(f)

        assert metadata
        entries = metadata["entries"].values()
        assert 0 < len(entries)
    except:
        print("report_create: retrieving metadata was unsuccessful")
        return {}

    # We now group entries by vendor
    grouped = {}
    for entry in entries:
        vendor = entry.get("vendor")
        if vendor is None:
            print(
                f"report_create: entry {entry} does not contain vendor")
            continue

        grouped.setdefault(vendor, [])
        grouped[vendor].append(entry)
    return grouped

metadata = load_metadata('./metadata.json')

### Load profiles from vendor
Choose the vendor you want to look at below


In [None]:
import logging, sys
logging.disable(sys.maxsize)

In [None]:
from algtestprocess.modules.data.tpm.manager import TPMProfileManager
from algtestprocess.modules.data.tpm.enums import CryptoPropResultCategory as cat

VENDOR = 'INTC'

cryptoprops = []
for entry in metadata.get(VENDOR):
    tpm_name = entry['TPM name']
    for measurement_path in entry['measurement paths']:
        man = TPMProfileManager(measurement_path)
        cryptoprops.append((tpm_name, man.cryptoprops))

### Merge the dataframes for same devices

In [None]:
tpms = {}
for tpm_name, cpp in cryptoprops:
    if cpp is None:
        print(f"Cannot merge {tpm_name}")
        continue

    if tpm_name not in tpms:
        tpms.setdefault(tpm_name, cpp)
    else:
        tpms[tpm_name] = tpms[tpm_name] + cpp

### Utility functions

In [None]:
import re
from math import inf

def tpm_sorted(profiles, device_name):
    """
    Sorts the profiles according to manufacturer id alphabetically, then
    firmware version numerically

    Assumes device name is in the form of rgx
    """
    RGX = r"(\s*.+)+\s\s*\d+(\.\d+)*(\s[\[]\d+[\]])?"
    try:
        assert all([re.match(RGX, device_name(p)) is not None for p in profiles])
    except AssertionError:
        print("These device names do not match format")
        print([name for p in profiles if not re.match(RGX, (name := device_name(p)))])

    def key_f(profile):
        manufacturer = version = idx = inf
        numbers = [inf] * 4
        l, r = device_name(profile).rsplit(maxsplit=1)

        if re.match(r"[\[]\d+[\]]", r):
            idx = int(r.replace("[", "").replace("]", ""))
            manufacturer, firmware = l.rsplit(maxsplit=1)
        else:
            manufacturer, firmware = l, r

        numbers = [int(x) for x in filter(None, firmware.split("."))]

        return [manufacturer] + numbers + [idx]

    return sorted(profiles, key=key_f)

def _table(l, cols, header):
    # header repeat col times
    out = ""
    out += "| " + (" | ".join(header) + " | ") * cols + "\n"
    out += "| " + ("|".join(["---"] * (cols * len(header))) + " | ") + "\n"

    i = 0
    while i < len(l):
        out += "| "
        for _ in range(cols):
            if i < len(l):
                entries = l[i]
            else:
                entries = [" " for _ in range(len(header))]

            assert len(entries) == len(header)

            out += " | ".join(map(str, entries)) + " | "
            i += 1
        out += "\n"
    return out


### Basic information
- We need to know about the success of parsing on profiles so that it is easy to find bad and missing measurements

In [None]:
from IPython.display import display, Markdown
# If you find false, it would be wise to check metadata for the corresponding files, as to 
# why it was not possible to load the cryptoprops
display(Markdown(_table(list(map(lambda x: [x[0], x[1] is not None], tpms.items())), 2, ['TPM Name', 'Profile object']))) 

In [None]:
# Here is the sorted order of tpms, to polish according to actual versions u need 
# to sort them manually.
# NOTE: We sort only according to MANUFACTURER FIRMWARE_VERSION
tpms_sorted = tpm_sorted(list(filter(None, tpms.values())), lambda cpps: f"{cpps.manufacturer} {cpps.firmware_version}")
display(Markdown(_table(list(map(lambda x: [x[0], x[1].device_name, x[1].results.get(cat.RSA_1024) is not None, x[1].results.get(cat.RSA_2048) is not None], enumerate(tpms_sorted))), 1, ['Index', 'TPM Name', 'RSA 1024', 'RSA 2048']))) 

In [None]:
tpms = tpms_sorted

### Visualisating the RSA primes 

In [None]:
import matplotlib.pyplot as plt
from algtestprocess.modules.visualization.heatmap import Heatmap
import matplotlib.lines as mlines
import pandas as pd
import matplotlib.gridspec as gridspec

def firmwarelist2id(firmware_versions):
    # First we group the same major version TPMs
    firmware_versions = sorted(firmware_versions, key=lambda x: [int(y) for y in x.split('.')])
    versions = {}
    for fv in firmware_versions:
        major = fv.split('.')[0]
        if versions.get(major) is None:
            versions.setdefault(major, [])
        
        versions[major].append(fv)
        

    # Then we start building the result
    result = ""
    for major, entries in versions.items():
        if len(result.split('\n')[0]) > 12:
            result += "\n"
        
        if result != "":
            result += " "
        if len(entries) == 1:
            result += entries[0]
        else:
            fst = entries[0].split('.')[1]
            lst = entries[-1].split('.')[1]

            if fst == lst:
                result += f"{major}.{fst}.X"
            else:
                result += f"{major}.{fst}-{lst}.X"

    return result
    
    

def cpps2fig(cpps, fig, alg, add_fingerprint):
    if cpps is None:
        return None

    result = None 
    if alg is not None:
        result = cpps.results.get(alg)
        if result is None:
            return None

    tpm_name = f"{cpps.manufacturer}"
    if isinstance(cpps.firmware_version, list):
        tpm_name += f" {firmwarelist2id(cpps.firmware_version)}"
    else:
        tpm_name += f" {cpps.firmware_version}"

    df = None
    if alg is None:
        # Special case when alg is None means we should merge the rsa dataframes
        dfs = []
        for alg in [cat.RSA_1024, cat.RSA_2048]:
            result = cpps.results.get(alg)
            if result is not None:
                dfs.append(result.data)
        assert dfs != []
        df = pd.concat(dfs)
    else:
        df = result.data

    # add the fingerprint analysis results
    fp = None
    if add_fingerprint:
        pqn = df[['p', 'q', 'n']]
        # We drop the nan rows
        pqn = pqn.dropna(subset=["p", "q", "n"]).values.tolist()
    
        fp = RSAFingerprintSet(pqn, recompute_q=True)
        fp.compute_fingerprint()
        fp = str(fp)
        print(fp)
    
    return Heatmap(df, tpm_name, additional_text=fp, fig=fig, ticks=False, legend=False, parts=['title', 'heatmap', 'text'], part_height_ratios=[1, 0.1], text_font_size = 9, label_values=False).build()

def create_multiplot(tpms, nrows, ncols, alg, figsize=(8.3, 11.7), add_fingerprint=False, add_legend=True):
    # Creating the figure with a constrained layout to avoid axes overlapping
    fig = plt.figure(layout='constrained', figsize=figsize, dpi=800)
    #fig.get_layout_engine().set(w_pad=0, h_pad=0, hspace=0, wspace=0)
    GridSpec = gridspec.GridSpec(ncols=ncols, nrows=nrows, figure= fig)
    subfig_count = ncols * nrows
    count = 0
    row = 0
    col = 0
    for cpps in tpms:
        if count >= subfig_count:
            break
        
        subfig = fig.add_subfigure(GridSpec[row, col])
        subfig.set_facecolor("none")
        
        if cpps2fig(cpps, subfig, alg, add_fingerprint) is not None:
            if col + 1 >= ncols:
                row += 1
                col  = 0
            else:
                col += 1
            count += 1
        else:
            print(f"Plot for {cpps.device_name} failed")

    if add_legend:
        # defining legend style and data
        green_line = mlines.Line2D([], [], color='green', label='$P_{min}$', linestyle='dashed')
        blue_line = mlines.Line2D([], [], color='blue', label='$P_{max}$', linestyle='dashed')
        orange_line = mlines.Line2D([], [], color='orange', label='$Q_{min}$', linestyle='dashed')
        purple_line = mlines.Line2D([], [], color='purple', label='$Q_{max}$', linestyle='dashed')
        skyblue_line = mlines.Line2D([], [], color='black', label='$P=Q$', linestyle='dashed')

        fig.legend(handles=[green_line, blue_line, orange_line, purple_line, skyblue_line], loc='lower right', bbox_to_anchor=(1,0), bbox_transform=fig.transFigure, fontsize='x-large')

## Create the multiplots

In [None]:
create_multiplot(tpms, 6, 4, cat.RSA_2048, figsize=(8.3, 14), add_fingerprint=True, add_legend=True)

In [None]:
if VENDOR == 'INTC':
    agg_rsa_2048 = [sum(tpms[0:11] ), sum(tpms[11:13]), tpms[13], sum(tpms[14:])] # INTC only 
elif VENDOR == 'IFX':
    agg_rsa_2048 = [tpms[0], sum(tpms[1:3]), sum(tpms[3:5]), sum(tpms[5:])]   # IFX
else:
    agg_rsa_2048 = [sum(tpms)]
create_multiplot(agg_rsa_2048, 1, 4, cat.RSA_2048, figsize=(8.3, 2), add_legend=False)

In [None]:
create_multiplot(tpms, 5, 4, cat.RSA_1024, figsize=(8.3, 13), add_fingerprint=True, add_legend=True)

In [None]:
if VENDOR == 'INTC':
    agg_rsa_1024 = [sum(tpms[0:11] ), sum(tpms[11:13]), tpms[13], sum(tpms[14:])] # INTC
elif VENDOR == 'IFX':
    agg_rsa_1024 = [tpms[0], sum(tpms[1:3]), sum(tpms[3:5]), sum(tpms[5:])]   # IFX
else:
    agg_rsa_1024 = [sum(tpms)] # STM

create_multiplot(agg_rsa_1024, 1, 4 , cat.RSA_1024, figsize=(8.3, 2), add_legend=False)

Merge both RSA 1024 and 2048

In [None]:
agg_rsa_both = agg_rsa_1024
create_multiplot(agg_rsa_both, 1, 4, None, figsize=(8.3, 2), add_legend=False)