In [58]:
from pynq import Overlay, allocate
import numpy as np
import math
import time

class DESEncryptor:
    """
    A class for performing DES encryption using a PYNQ overlay.
    """
    SIZE_OF_ALGO = 8  # 8 bytes = 64 bits
    MAX_PARALLEL_SEND = (2 ** 16)
    INPUT_TYPE = np.dtype([("message", np.bytes_, SIZE_OF_ALGO), ("key", np.bytes_, SIZE_OF_ALGO)])
    OUTPUT_TYPE = np.dtype([("result", np.bytes_, SIZE_OF_ALGO)])

    def __init__(self, bitstream_path='./DES_axi_wrapper.bit', logging=True):
        """
        Initializes the DESEncryptor with the provided bitstream.

        Args:
            bitstream_path (str, optional): Path to the bitstream file.
                Defaults to './DES_axi_wrapper.bit'.
        """
        # Program bitstream to FPGA
        self.overlay = Overlay(bitstream_path)
        self.logging = logging

        # Access to AXI DMA
        self.dma = self.overlay.axi_dma_0
        self.dma_send = self.dma.sendchannel
        self.dma_recv = self.dma.recvchannel

        # Allocate physical memory
        self.input_buffer = allocate(shape=(self.MAX_PARALLEL_SEND,), dtype=self.INPUT_TYPE)
        self.output_buffer = allocate(shape=(self.MAX_PARALLEL_SEND,), dtype=self.OUTPUT_TYPE)
        print(f"Using {self.MAX_PARALLEL_SEND}x type with input: {self.INPUT_TYPE.itemsize * 8} bits, and output: {self.OUTPUT_TYPE.itemsize * 8} bits")

    def encrypt(self, message, key):
        """
        Encrypts the given message using the given key.  Pads the message with
        null bytes if it is not a multiple of 8 bytes.

        Args:
            message (str): The message to encrypt.
            key (str): The encryption key (must be 8 bytes).

        Returns:
            list: A list of hexadecimal strings representing the encrypted output.

        Raises:
            AssertionError: If the key length is not 8 bytes.
        """
        key_bytes = bytes(key, "ascii")
        assert len(key_bytes) == 8, f"Key must be 8 bytes, got {len(key_bytes)}"

        message_bytes = bytes(message, "ascii")
        original_message_length = len(message_bytes)  # Store original length for later use

        # Pad the message with null bytes if it's not a multiple of 8
        padding_needed = 8 - (len(message_bytes) % 8) if len(message_bytes) % 8 != 0 else 0
        message_bytes += b'\0' * padding_needed

        iterations = math.ceil(len(message_bytes) / self.SIZE_OF_ALGO)
        assert iterations <= self.MAX_PARALLEL_SEND, f"Message size exceeds maximum parallel send size of {self.MAX_PARALLEL_SEND}"

        # Write input
        for i in range(iterations):
            part = message_bytes[self.SIZE_OF_ALGO * i : self.SIZE_OF_ALGO * (i + 1)]
            if self.logging:
                print(f"Now sending part({i}, {self.SIZE_OF_ALGO * i}): {part}, {part.hex()}, with key: {key_bytes}, {key_bytes.hex()}")
            self.input_buffer[i] = (part[::-1], key[::-1])

        # Do AXI DMA MM2S transfer
        self.dma_send.transfer(self.input_buffer)
        # Do AXI DMA S2MM transfer
        self.dma_recv.transfer(self.output_buffer)

        # Get the output
        output_hex = "".join([self.output_buffer[i].tobytes()[::-1].hex() for i in range(iterations)])
        return output_hex

    def __del__(self):
        """
        Destructor to deallocate buffers and prevent memory leaks.
        """
        if hasattr(self, 'input_buffer'):
            del self.input_buffer
        if hasattr(self, 'output_buffer'):
            del self.output_buffer

In [24]:
encryptor = DESEncryptor()

Using 8x type with input: 128 bits, and output: 64 bits


In [25]:
# Example
message = "AAAAAAA" # 7 bytes, will be padded
key = "BBBBBBBB"     # 8 bytes
encrypted_result = encryptor.encrypt(message, key)
print(f"Encrypted result: {encrypted_result}")

message_long = "AAAAAAAAAAAAAAA" # 15 bytes
encrypted_result_long = encryptor.encrypt(message_long, key)
print(f"Encrypted result (long): {encrypted_result_long}")


Now sending part: b'AAAAAAA\x00', 4141414141414100, with key: b'BBBBBBBB', 4242424242424242
Encrypted result: 5d641f98ad79404e
Now sending part: b'AAAAAAAA', 4141414141414141, with key: b'BBBBBBBB', 4242424242424242
Now sending part: b'AAAAAAA\x00', 4141414141414100, with key: b'BBBBBBBB', 4242424242424242
Encrypted result (long): 57fa2ec1bb39ca185d641f98ad79404e


In [None]:
from des import DesKey


def encrypt_with_pyDes(message, key):
    """
    Encrypts the given message using the pyDes library.

    Args:
        message (str): The message to encrypt.
        key (str): The encryption key (must be 8 bytes).

    Returns:
        bytes: The encrypted message in bytes.
    """
    key_bytes = bytes(key, "ascii")
    des_key = DesKey(key_bytes)
    # Pad message with null bytes to be a multiple of 8
    padding_needed = 8 - (len(message) % 8) if len(message) % 8 != 0 else 0
    message += '\0' * padding_needed
    message_bytes = message.encode('ascii')
    cipher_text = des_key.encrypt(message_bytes, padding=False)
    return cipher_text

In [48]:
# Test Cases
def test_compare_des_implementations():
    """
    Test to compare the output of the PYNQ DES implementation with the pyDes library.
    """
    encryptor = DESEncryptor()  # Create an instance of DESEncryptor

    test_vectors = [
        {"message": "ABCDEFGH", "key": "12345678"},
        {"message": "ABCDEFGHabcdefgh", "key": "12345678"},
        {"message": "ABCDEFG", "key": "12345678"},
        {"message": "", "key": "12345678"},
        {"message": "ABCDEFGHIJKLMNOPQRSTUVWXYZ123456", "key": "12345678"},
    ]

    for test_vector in test_vectors:
        message = test_vector["message"]
        key = test_vector["key"]
        print(f"Message: {message}, Key: {key}")

        # Get results from both implementations
        pynq_result_hex = encryptor.encrypt(message, key)
        pynq_result_bytes = bytes.fromhex(pynq_result_hex)
        print(f"PYNQ Result (bytes): {pynq_result_bytes.hex()}")

        pyDes_result_bytes = encrypt_with_pyDes(message, key)
        print(f"pyDes Result (bytes): {pyDes_result_bytes.hex()}")

        # Compare the results.  Note that the hex representation should be the same.
        assert pynq_result_bytes == pyDes_result_bytes, f"Results do not match for message: {message}, key: {key}"
test_compare_des_implementations()


Using 16x type with input: 128 bits, and output: 64 bits
Message: ABCDEFGH, Key: 12345678
Now sending part(0, 0): b'ABCDEFGH', 4142434445464748, with key: b'12345678', 3132333435363738
PYNQ Result (bytes): 96de603eaed6256f
pyDes Result (bytes): 96de603eaed6256f
Message: ABCDEFGHabcdefgh, Key: 12345678
Now sending part(0, 0): b'ABCDEFGH', 4142434445464748, with key: b'12345678', 3132333435363738
Now sending part(1, 8): b'abcdefgh', 6162636465666768, with key: b'12345678', 3132333435363738
PYNQ Result (bytes): 96de603eaed6256f94d4436bc3b5b693
pyDes Result (bytes): 96de603eaed6256f94d4436bc3b5b693
Message: ABCDEFG, Key: 12345678
Now sending part(0, 0): b'ABCDEFG\x00', 4142434445464700, with key: b'12345678', 3132333435363738
PYNQ Result (bytes): 10565b3a6fd1aac7
pyDes Result (bytes): 10565b3a6fd1aac7
Message: , Key: 12345678
PYNQ Result (bytes): 
pyDes Result (bytes): 
Message: ABCDEFGHIJKLMNOPQRSTUVWXYZ123456, Key: 12345678
Now sending part(0, 0): b'ABCDEFGH', 4142434445464748, with key:

In [59]:
# Performance


def compare_des_timing():
    """
    Compares the execution time of the PYNQ DES implementation with the pyDes library.
    """
    encryptor = DESEncryptor(logging=False)  # Create an instance of DESEncryptor

    test_vectors = [
        {"message": "ABCDEFGH", "key": "12345678"},
        {"message": "ABCDEFGHabcdefgh", "key": "12345678"},
        {"message": "ABCDEFG", "key": "12345678"},
        {"message": "", "key": "12345678"},
        {"message": "ABCDEFGHIJKLMNOPQRSTUVWXYZ123456", "key": "12345678"},
        {"message": "ABCDEFGHIJKLMNOPQRSTUVWXYZ123456ABCDEFGHIJKLMNOPQRSTUVWXYZ123456", "key": "12345678"},
        {"message": "A" * 128, "key": "12345678"},
        {"message": "A" * 128 * 4, "key": "12345678"},
        {"message": "A" * 2 ** 12, "key": "12345678"},
    ]

    num_iterations = 10  # Number of times to repeat the encryption for timing

    print("Timing Comparison:")
    print(f"Number of iterations: {num_iterations}")

    for test_vector in test_vectors:
        message = test_vector["message"]
        key = test_vector["key"]

        # Time PYNQ encryption
        start_time_pynq = time.time()
        for _ in range(num_iterations):
            pynq_result_hex = encryptor.encrypt(message, key)
        end_time_pynq = time.time()
        pynq_time = end_time_pynq - start_time_pynq

        # Time pyDes encryption
        start_time_pydes = time.time()
        for _ in range(num_iterations):
            pyDes_result_bytes = encrypt_with_pyDes(message, key)
        end_time_pydes = time.time()
        pydes_time = end_time_pydes - start_time_pydes

        print(f"\nMessage: {message}, Key: {key}")
        print(f"PYNQ Time: {pynq_time:.6f} seconds")
        print(f"pyDes Time: {pydes_time:.6f} seconds")
        print(f"Speed up of: {pydes_time / pynq_time:.6f}")

        # Compare the results (optional, for functional verification)
        pynq_result_bytes = bytes.fromhex(pynq_result_hex)
        pyDes_result_bytes = encrypt_with_pyDes(message, key)
        assert pynq_result_bytes == pyDes_result_bytes, f"Results do not match for message: {message}, key: {key}"

compare_des_timing()

Using 65536x type with input: 128 bits, and output: 64 bits
Timing Comparison:
Number of iterations: 10


RuntimeError: DMA channel not idle