# Smart SSH Key Extractor

In [1]:
import os
import time
import numpy as np
from utils import log, get_dataset_file_paths, load_models, read_key_files, WrappedClassifier
import utils
from timeit import default_timer as timer

In [2]:
# prepare cwd

# check there is "out/" folder in cwd
if not os.path.exists("out/"):
    os.makedirs("out/")

In [3]:
# program parameters
from dataclasses import dataclass

@dataclass
class ProgramParameters:
    """
    Wrapper class for program parameters.
    """

    # Sets the execution to deploy mode. Does not require key files.
    # If False: We give it '.json' files with key info to check the code.
    # If True: We give no hint about the keys, and the code should find them.
    DEPLOY = False

    OUTPUT_DIR_PATH = os.environ['HOME'] + "/Documents/code/phdtrack/Smart-and-Naive-SSH-Key-Extraction/smart_ssh_key_extractor/out"
    DATASET_DIR_PATH = os.environ['HOME'] + '/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/'  # TODO: ???

    def __init__(self, **kwargs):
        if (
            utils.check_path_exists(self.OUTPUT_DIR_PATH) and
            utils.check_path_exists(self.DATASET_DIR_PATH)
        ):
            print("Program paths are OK.")
        else:
            print("Program paths are NOT OK.")
            exit(1)
        
        self.print_params()

    def print_params(self):
        print("Program parameters:")
        print("DEPLOY:", self.DEPLOY)
        print("OUTPUT_DIR_PATH:", self.OUTPUT_DIR_PATH)
        print("DATASET_DIR_PATH:", self.DATASET_DIR_PATH)


PARAMS = ProgramParameters()

Program paths are OK.
Program parameters:
DEPLOY: False
OUTPUT_DIR_PATH: /home/onyr/Documents/code/phdtrack/Smart-and-Naive-SSH-Key-Extraction/smart_ssh_key_extractor/out
DATASET_DIR_PATH: /home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/


## Functions

In [4]:
def get_run_length_encoded(data_block: np.ndarray):
    """
    Takes a data block of bytes.
    Determine for each byte the number of consecutive bytes.
    Returns the list of single bytes and their associated following number
    in the original data block.

    Example:
        data_block = [a,a,a,a,b,b,c,a,a]
        byte_array = [a,b,c,a]
        counts = [4,2,1,2]
    """

    idx = 1
    byte_array = []
    counts = []
    count = 1
    curr_char = data_block[0]
    while idx < len(data_block):
        if data_block[idx] == curr_char:
            idx += 1
            count += 1
            continue

        else:
            byte_array.append(curr_char)
            counts.append(count)

            count = 1
            curr_char = data_block[idx]

        idx += 1

    # Append the last byte and count
    byte_array.append(curr_char)
    counts.append(count)

    return bytearray(byte_array), counts

In [5]:
def generate_rle_representation(raw_data: bytearray):
    """
    Takes a data block of bytes (NOTE: byte == octet != bit).
    Determine for each byte the number of consecutive bytes.
    Returns the list of single bytes and their associated following number
    in the original data block, and the cumulative sum of the counts.
    """

    reshaped = np.reshape(raw_data, newshape=(int(len(raw_data) / 8), 8))

    # Here we take reshaped array and compute the numerical row and column wise gradient and count the number
    # of zeroes in each row. If there are more than 4 zeros which means there is a pattern repeating and
    # is not a key. This is a very conservative estimate for better recall
    num_row = int(len(raw_data) / 8)

    # x_grad = np.abs(np.diff(reshaped.astype(int), axis=1, append=np.zeros((num_row, 1)))).astype(bool)
    # y_grad = np.abs(np.diff(reshaped.astype(int), axis=0, append=np.zeros((1, 8)))).astype(bool)
    # The above numerical gradient computation is transformed into a single step below

    poss_key_locs = (np.count_nonzero(np.abs(np.diff(reshaped.astype(int), axis=1,
                                                     append=np.zeros((num_row, 1)))).astype(bool) &
                                      np.abs(np.diff(reshaped.astype(int), axis=0,
                                                     append=np.zeros((1, 8)))).astype(bool),
                                      axis=1) >= 4).astype(int)  # Changed from 4 to 3 to accommodate for 12 byte keys

    # This part addresses the issue of 12 byte keys. There could be two identical byte_array next to each other in the
    # last 4 bytes which would make it impossible for a key loc. We modify that if there is a possibility for a key
    idx = 1
    while idx < len(poss_key_locs):
        # Last 4 byte_array must be zeros and first four should have at least 3 unique byte_array
        if poss_key_locs[idx] == 0 and poss_key_locs[idx-1] == 1 and \
                all(reshaped[idx][4:]) == 0 and len(set(reshaped[idx][:4])) > 2:
            poss_key_locs[idx] = 1
        idx += 1

    # Roll the data to the left
    rolled = np.roll(poss_key_locs, -1)
    # The key cannot start at the last byte and then the block contain the whole key.
    # So the last value is set to False
    rolled[-1] = False
    poss_key_locs = (poss_key_locs & rolled).astype(int)

    # Roll right and OR it. The whole operation is similar to the opening morphological operation
    rolled = np.roll(poss_key_locs, 1)
    rolled[0] = False

    poss_key_locs = poss_key_locs | rolled

    byte_array, counts = get_run_length_encoded(poss_key_locs)

    cum_sum = [0]

    for idx in range(len(counts)):
        cum_sum.append(cum_sum[idx] + counts[idx])

    cum_sum = [x * 8 for x in cum_sum] # TODO: ???

    # The last offset is not required for the cumulative sum
    return byte_array, counts, cum_sum[:-1]

In [6]:
def get_slices(data, offsets, keys, max_key_size=128, deploy=False):
    """
    Takes a data block and a list of offsets.
    Cut the data block at the offsets and return a list of data blocks.
    Also return for each data block
    a label indicating if it contains a key or not. If deploy is True,
    the label is always 0.

    If we test the code (deploy=False), we additionnaly tests 
    if the data block contains a key.
    """

    data_blocks = []
    labels = []
    last_frame_added = False
    key_count = [0] * len(keys)
    for offset in offsets:
        if offset + max_key_size > len(data):
            curr_data = data[-max_key_size:]
            last_frame_added = True
        else:
            curr_data = data[offset:offset+max_key_size]
        data_blocks.append(curr_data)
        
        if deploy is True:
            labels.append(0)
            continue
            
        found = [l_idx for l_idx in range(len(keys)) if keys[l_idx] in curr_data]

        if len(found) > 0:
            labels.append(1)
            for key_idx in set(found):
                key_count[key_idx] += 1

        else:
            labels.append(0)

        if last_frame_added is True:
            break
    
    if deploy is False:
        assert len(data_blocks) == len(labels)
        assert sum(labels) > 0 and sum(labels) >= len(keys)
        assert min(key_count) != 0
    
    return data_blocks, labels

In [7]:
def build_encoded_dataset(heap_paths, key_paths, max_key_size=128, deploy=False):
    """
    Takes a list of heap paths and a list of key paths.
    For each heap path, it reads the heap file and the corresponding key file.
    It then generates a list of data blocks and a list of labels.
    The data blocks are generated by cutting the heap file at the offsets
    where a key is likely to be found. The labels indicate if the data block
    is sure to contains a key or not (when anotation is provided (deploy == False)).

    Returns a list of data blocks and a list of labels.
    """

    dataset = []
    labels = []
    for heap_path, key_path in zip(heap_paths, key_paths):

        # Check if the key path corresponds to the heap path, then read the required data
        curr_keys = ['EMPTY KEYS']
        if deploy is False:
            assert (key_path.replace(".json", "-heap.raw") == heap_path)
            curr_keys = read_key_files(key_path)
            # Remove repeated keys. This is an issue for some older versions of OpenSSH
            curr_keys = list(map(bytearray, set(tuple(x) for x in curr_keys)))

        with open(heap_path, "rb") as fp:
            data = bytearray(fp.read())
            
        byte_array, counts, cum_sum = generate_rle_representation(data)
        
        # a key can only starts with a byte of value 1, example: 00000001 10100111 101011001 ...
        viable_offsets = [cum_sum[idx] for idx in range(len(cum_sum)) if byte_array[idx] == 1]
        slices, curr_labels = get_slices(
            data=data, 
            offsets=viable_offsets, 
            max_key_size=max_key_size, 
            keys=curr_keys, 
            deploy=deploy
        )
        dataset.extend(slices)
        labels.extend(curr_labels)

    assert len(labels) == len(dataset)

    return dataset, labels

In [8]:
def generate_probable_slices(clf, heap_paths, key_paths, params: ProgramParameters):
    # Sort the heap paths and key paths, so it easier to group them by version and key length
    heap_paths.sort()
    key_paths.sort()
    
    # For each of the keys
    for idx in range(len(heap_paths)):
        start = time.time()

        key_path: str = ""
        if params.DEPLOY is False:
            key_path = key_paths[idx]
            
        dataset, curr_labels = build_encoded_dataset(
            heap_paths=[heap_paths[idx]],
            key_paths=[key_path], 
            deploy=params.DEPLOY
        )

        x_test = np.array(dataset).astype(int)
        curr_pred = clf.predict(x_test) # Make predictions

        # path manipulation to tranform input file paths to output file paths
        # example: transform "$INPUT/lastdir/heap.raw" to "$OUTPUT/lastdir/heap.txt
        file_name = os.path.basename(heap_paths[idx])
        file_name = file_name.replace(".raw", ".txt")
        sub_dir = os.path.dirname(heap_paths[idx]).split("/")[-1]
        dir_path = os.path.join(params.OUTPUT_DIR_PATH, sub_dir)
        output_path = os.path.join(dir_path, file_name)

        if os.path.exists(dir_path) is False:
            os.makedirs(dir_path)

        with open(output_path, 'w') as file:
            for inner_idx, pred in enumerate(curr_pred):
                # save if prediction not 0
                if pred != 0:
                    # format block to two digit hex
                    temp = ''.join(format(x, '02x') for x in dataset[inner_idx])
                    file.write(temp + "\n")

        end = time.time()
        log('Total time taken for file %s: %f' % (heap_paths[idx], (end - start)))

## Execution

In [9]:
# Load the models
clf = load_models(load_high_recall_only=True)
print(type(clf))

2023-01-12 11:27:53.286661:	Time taken for loading high recall classifier: 0.013232
<class 'sklearn.ensemble._forest.RandomForestClassifier'>


In [10]:
# Search for all the files within the test directory
# start = time.time()
start = timer()
heap_paths, key_paths = get_dataset_file_paths(PARAMS.DATASET_DIR_PATH, deploy=PARAMS.DEPLOY)
end = timer()
# end = time.time()
log('Time taken for finding all files: %f' % (end - start))

# print first 5 values in heap_paths
print(
    "heap_paths (nbe: %i):" % len(heap_paths), 
    heap_paths[:5]
)
print(
    "key_paths: (nbe: %i):" % len(key_paths), 
    key_paths[:5]
)
if len(key_paths) == 0 and PARAMS.DEPLOY is False:
    print("No key paths found while deploy is False. Please check the dataset directory path.")
elif len(key_paths) == 0 and PARAMS.DEPLOY is True:
    print("No key paths found while deploy is True. This is expected.")

2023-01-12 11:27:53.306242:	Time taken for finding all files: 0.001965
heap_paths (nbe: 300): ['/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10484-1650982250-heap.raw', '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10517-1650982250-heap.raw', '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10324-1650982250-heap.raw', '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10333-1650982250-heap.raw', '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10351-1650982250-heap.raw']
key_paths: (nbe: 300): ['/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10484-1650982250.json', '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10517-1650982250.json', '/home/onyr/Documents

In [11]:
generate_probable_slices(clf=clf, heap_paths=heap_paths, key_paths=key_paths, params=PARAMS)

2023-01-12 11:27:53.333881:	Total time taken for file /home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10237-1650982250-heap.raw: 0.010874
2023-01-12 11:27:53.343014:	Total time taken for file /home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10240-1650982250-heap.raw: 0.009085
2023-01-12 11:27:53.351357:	Total time taken for file /home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10243-1650982250-heap.raw: 0.008318
2023-01-12 11:27:53.359449:	Total time taken for file /home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10246-1650982250-heap.raw: 0.008070
2023-01-12 11:27:53.367676:	Total time taken for file /home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10249-1650982250-heap.raw: 0.008173
2023-01-12 11:27:53.375779:	Total time taken for file /home/onyr/Documents/

In [12]:
key_paths

['/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10237-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10240-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10243-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10246-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10249-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10252-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10255-1650982250.json',
 '/home/onyr/Documents/code/phdtrack/phdtrack_data/Performance_Test/Performance_Test/V_7_1_P1/16/10258-1650982250.json',
 '/home/onyr/Documents/code/phdt