<a href="https://colab.research.google.com/github/carlos-alves-one/-Crypto-Electronic-Medical-Records/blob/main/crypto_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Goldsmiths University of London
### MSc. Data Science and Artificial Intelligence
### Module: Blockchain Programming
### Author: Carlos Manuel De Oliveira Alves
### Student: cdeol003
### Programming Assignment

## Import Cryptography Libraries

In [43]:
# Import the 'hashes' module from the cryptography library's hazmat primitives, used for cryptographic hashing
from cryptography.hazmat.primitives import hashes

# Import 'Encoding', 'PublicFormat', and 'load_der_public_key' from the serialization module for encoding formats,
# public key formats, and loading DER-encoded public keys, respectively
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, load_der_public_key

# Import the 'ec' (Elliptic Curve cryptography) and 'utils' modules from the asymmetric part of the hazmat primitives
# for performing operations with elliptic curves and various utility functions
from cryptography.hazmat.primitives.asymmetric import ec, utils


# Part 1: Records and Verification

The objective of the first phase of this project is to establish a framework for Electronic Medical Records (EMR) that leverages cryptographic techniques, marking a significant leap forward in creating secure digital infrastructures, especially within the healthcare industry. This phase delves into four key areas: the construction of classes, the application of cryptography, the management of errors and validations, and the formulation of testing protocols. This effort’s foundation is the meticulous design of classes to ensure that the EMR system is equipped with all the essential features and functions for effective operation. Employing cryptographic algorithms, primarily via the pyca/cryptography library, is pivotal for protecting medical records from unauthorized access and modifications. Implementing robust error handling and validation processes is indispensable for detecting and addressing discrepancies such as invalid signatures or tampered records. Furthermore, developing detailed testing strategies is crucial for confirming the system’s functionality and security safeguards, mitigating potential risks, and upholding the system’s integrity. By focusing on these critical aspects, the development of EMR systems significantly enhances the protection of patient data, driving the healthcare sector toward a more secure and streamlined digital era.

## EMR Class

The EMR class embodies a comprehensive system for Electronic Medical Records, integrating essential functionalities for initialising records and assuring their integrity and authenticity. It is designed to perform critical operations such as hash validation, prescription validation, nonce verification, transaction ID (txid) calculation, and digital signature verification. This class encapsulates the logic for creating, verifying, and manipulating electronic medical records. It includes methods for initialising a record, verifying its integrity and authenticity, and computing necessary cryptographic hashes and signatures, ensuring secure and reliable medical data management.

In [44]:
class EMR:

    # Declare the constructor for this class
    def __init__(self, Dr_hash, Patient_hash, Dr_public_key, prescription, nonce, signature, txid):

        # Initialize the EMR instance with doctor's and patient's hashes, doctor's public key,
        # prescription details, nonce, digital signature, and transaction ID
        self.Dr_hash = Dr_hash
        self.Patient_hash = Patient_hash
        self.Dr_public_key = Dr_public_key
        self.prescription = prescription
        self.nonce = nonce
        self.signature = signature
        self.txid = txid

    # Declare function to perform several checks to verify the integrity and authenticity of the EMR data
    def verify(self, Dr_previous_nonce):

        # Check the length of doctor's and patient's hashes
        if len(self.Dr_hash) != 20 or len(self.Patient_hash) != 20:
            raise Exception("Hash is wrong length")

        # Validate the doctor's public key by comparing its hash against the provided doctor's hash
        if calculate_sha1_hash(self.Dr_public_key) != self.Dr_hash:
            raise Exception("Invalid doctor public key")

        # Ensure the prescription text is valid and within the allowed byte size
        if not isinstance(self.prescription, str) or len(self.prescription.encode('utf-8')) > 200:
            raise Exception("Invalid prescription")

        # Check if the nonce is sequentially correct
        if self.nonce != Dr_previous_nonce + 1:
            raise Exception("Invalid nonce")

        # Validate the transaction ID by recalculating it and comparing with the provided value
        expected_txid = calculate_txid(self.Dr_hash, self.Patient_hash, self.Dr_public_key, self.prescription, self.nonce, self.signature)
        if self.txid != expected_txid:
            raise Exception("Invalid txid")

        # Verify the digital signature to ensure the data's integrity and authenticity
        signature_hash = calculate_signature_hash(self.Patient_hash, self.prescription, self.nonce)
        key = load_der_public_key(self.Dr_public_key)
        try:
            key.verify(self.signature, signature_hash, ec.ECDSA(utils.Prehashed(hashes.SHA256())))
        except:
            raise Exception("Invalid signature")


### Supporting Functions

> This function calculate_sha1_hash computes the SHA-1 hash of the provided data. SHA-1 generates a 160-bit (20-byte) hash, which is part of various security protocols and systems.

In [45]:
def calculate_sha1_hash(data):
    digest = hashes.Hash(hashes.SHA1())
    digest.update(data)
    return digest.finalize()


> This function calculate_txid generates a transaction ID using SHA-256 by hashing together the doctor's hash, patient's hash, doctor's public key, prescription details, nonce, and digital signature.

In [46]:
def calculate_txid(Dr_hash, Patient_hash, Dr_public_key, prescription, nonce, signature):
    digest = hashes.Hash(hashes.SHA256())
    digest.update(Dr_hash)
    digest.update(Patient_hash)
    digest.update(Dr_public_key)
    digest.update(prescription.encode('utf-8'))
    digest.update(nonce.to_bytes(8, byteorder='little', signed=False))
    digest.update(signature)
    return digest.finalize()


> This function calculate_signature_hash produces a hash used for signature verification, combining the patient's hash, prescription, and nonce using SHA-256.

In [47]:
def calculate_signature_hash(Patient_hash, prescription, nonce):
    digest = hashes.Hash(hashes.SHA256())
    digest.update(Patient_hash)
    digest.update(prescription.encode('utf-8'))
    digest.update(nonce.to_bytes(8, byteorder='little', signed=False))
    return digest.finalize()


These components together form a secure system for managing and verifying electronic medical records, ensuring that they are authentic, have not been tampered with, and come from a verified source. The verification process involves checking the integrity of the provided information, such as hash lengths, the correctness of the nonce, the validity of the transaction ID, and the authenticity of the signature using an elliptic curve digital signature algorithm (ECDSA) with SHA-256 as the hash function.

## Create Signed Record Function

This function is designed to generate a signed EMR (Electronic Medical Record) record, leveraging the doctor's private key alongside the patient's public key hash, the prescription, and a nonce as inputs. It meticulously prepares the necessary data, computes the signature, and assembles the record. The process includes calculating the transaction ID (txid) and the signature for the new record, ensuring the integrity and authenticity of the medical record through these cryptographic measures.

In [48]:
def create_signed_record(Dr_private_key, patient_hash, prescription, nonce):

    # Convert the doctor's public key to DER format
    Dr_public_key = Dr_private_key.public_key().public_bytes(encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)

    # Calculate the doctor's public key hash
    Dr_hash = calculate_sha1_hash(Dr_public_key)

    # Prepare the data for signing
    signature_hash = calculate_signature_hash(patient_hash, prescription, nonce)

    # Sign the data
    signature = Dr_private_key.sign(signature_hash, ec.ECDSA(utils.Prehashed(hashes.SHA256())))

    # Calculate the transaction ID
    txid = calculate_txid(Dr_hash, patient_hash, Dr_public_key, prescription, nonce, signature)

    # Create and return the EMR record
    return EMR(Dr_hash, patient_hash, Dr_public_key, prescription, nonce, signature, txid)


## Test Cases

In [49]:
# Import the unittest module for creating and running unit tests in Python
import unittest


This code snippet defines a test suite for the EMR class that verifies the correct behaviour of the class and the function create_signed_record, focusing on the verification logic under various conditions, using Python's unit test framework adapted for execution in an environment that requires a different approach than running tests in a standard Python environment due to the interactive nature of notebooks.

In [50]:
class TestEMR(unittest.TestCase):

    # Define a setup method to initialize variables before each test case runs
    def setUp(self):

        # Generate a doctor's private key using the SECP256K1 elliptic curve
        self.Dr_private_key = ec.generate_private_key(ec.SECP256K1())

        # Calculate and store a SHA-1 hash of a simulated patient's public key
        self.patient_hash = calculate_sha1_hash(b"Patient's Public Key")

        # Define a sample prescription text for testing
        self.prescription = "Medication A: 10mg"

        # Initialize a nonce to 1, to be used in the creation of a test EMR record
        self.nonce = 1

    # Define a test case method for verifying a successfully signed record
    def test_record_verification_success(self):

        # Create a signed EMR record using the doctor's private key, patient hash, prescription data, and nonce
        record = create_signed_record(self.Dr_private_key, self.patient_hash, self.prescription, self.nonce)

        # Set the previous nonce value to 0, assuming this is the first transaction record
        Dr_previous_nonce = 0

        # Attempt to verify the record; set result to True if successful, False if an exception occurs
        try:
            record.verify(Dr_previous_nonce)
            result = True
        except Exception as e:
            result = False

        # Assert that the result is True, indicating the record verification was successful
        self.assertTrue(result)

    # Define a test case to verify that an altered transaction ID causes record verification to fail
    def test_invalid_txid_raises_exception(self):

        # Create a signed EMR record with valid parameters
        record = create_signed_record(self.Dr_private_key, self.patient_hash, self.prescription, self.nonce)

        # Manually set the record's transaction ID (txid) to an invalid value
        record.txid = b"Invalid TXID"

        # Use a context manager to assert that an exception is raised during record verification, indicating failure
        with self.assertRaises(Exception):

            record.verify(0)  # Attempt to verify the record with a modified txid, expecting it to raise an exception

    # Define a test case to ensure that tampering with the signature causes record verification to fail
    def test_invalid_signature_raises_exception(self):

        # Create a new EMR record with a valid signature initially
        record = create_signed_record(self.Dr_private_key, self.patient_hash, self.prescription, self.nonce)

        # Manually change the record's signature to an invalid value to simulate tampering
        record.signature = b"Invalid Signature"

        # Use a context manager to assert that an exception is raised, indicating that the record's verification fails
        with self.assertRaises(Exception):

            record.verify(0)  # Attempt to verify the tampered record, expecting failure due to the invalid signature


In [51]:
# Load all test cases from the TestEMR class into a test suite
suite = unittest.TestLoader().loadTestsFromTestCase(TestEMR)


In [52]:
# Execute the test suite with a higher verbosity level for detailed output
unittest.TextTestRunner(verbosity=2).run(suite)


test_invalid_signature_raises_exception (__main__.TestEMR) ... ok
test_invalid_txid_raises_exception (__main__.TestEMR) ... ok
test_record_verification_success (__main__.TestEMR) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.020s

OK


<unittest.runner.TextTestResult run=3 errors=0 failures=0>

# Part 2: Blocks

The objective of the second phase of this project is to validate blocks in the context of a blockchain or distributed ledger system.

The assignment requires implementing classes and methods to ensure the integrity and validity of blocks within the system. This likely involves verifying the correctness of block data, checking the consistency of hashes, and enforcing any necessary rules or constraints on the blocks.

The assignment will leverage the same cryptography library used earlier, explicitly employing the SHA-256 algorithm for computing hashes. Understanding how to use this library and algorithm is crucial for successfully validating blocks.

In summary, the goal is to develop robust code that can validate the integrity and adherence to specified rules for blocks within a blockchain-like system, building upon previously acquired knowledge and utilizing provided cryptographic tools.

## UserState Class

Implementing the UserState class will be straightforward. It contains a constructor that takes a nonce parameter and stores it. This class helps track the nonce for each user, which is particularly important in blockchain systems to prevent replay attacks and ensure transactions are processed in order.

The UserState class has a constructor that takes two parameters:

- balance: The balance of the user

- nonce: The most recently used nonce of the user

Both parameters are exposed as fields of the class.

In [53]:
# Declare the class UserState
class UserState:

    # Constructor method that initializes a UserState object with balance and nonce parameters
    def __init__(self, balance, nonce):

        # Assigns the provided balance value of the user
        self.balance = balance

        # Assigns the most recently used nonce of the user
        self.nonce = nonce


## Block Class

The Block class has a constructor that takes the following parameters:

- previous: The block id of the block before this one in the blockchain (zero for the first block)

- height: The number of blocks before this one in the blockchain (0 for the first block)

- miner: The public key hash of the user responsible for mining this block

- records: A list containing the records within this block

- timestamp: An integer representing the number of seconds since 1st January 1970 (Unix Time)

- difficulty: An integer indicating the difficulty of the proof of work needed to mine this block

- block_id: A 32-byte hash of the block

- nonce: An integer between 0 and 2^64 - 1

In [54]:
# Declare the class named Block
class Block:

    # Constructor method that initializes a Block object with various parameters
    def __init__(self, previous, height, miner, records, timestamp, difficulty, block_id, nonce):

        # Assigns the provided previous value to the previous attribute of the Block object, representing the hash of the previous block
        self.previous = previous

        # Assigns the provided height value to the height attribute of the Block object, representing the block's height in the blockchain
        self.height = height

        # Assigns the provided miner value to the miner attribute of the Block object, representing the miner who mined the block
        self.miner = miner

        # Assigns the provided records value to the records attribute of the Block object, representing the transactions or data records included in the block
        self.records = records

        # Assigns the provided timestamp value to the timestamp attribute of the Block object, representing the creation timestamp of the block
        self.timestamp = timestamp

        # Assigns the provided difficulty value to the difficulty attribute of the Block object, representing the mining difficulty of the block
        self.difficulty = difficulty

        # Assigns the provided block_id value to the block_id attribute of the Block object, representing a unique identifier for the block
        self.block_id = block_id

        # Assigns the provided nonce value to the nonce attribute of the Block object, representing the nonce used in the mining process
        self.nonce = nonce

    # Declare the verify_and_get_changes function
    def verify_and_get_changes(self, difficulty, previous_user_states):

      # Check if the block difficulty matches the expected difficulty
      if self.difficulty != difficulty:
          raise Exception("Block difficulty does not match expected difficulty")

      # Calculate the expected block ID
      expected_block_id = self.calculate_block_id()

      # Check if the block ID matches the expected block ID
      if self.block_id != expected_block_id:
          raise Exception("Block ID is incorrect")

      # Check if the number of records in the block exceeds 25
      if len(self.records) > 25:
          raise Exception("Block contains more than 25 records")

      # Check if the miner field is 20 bytes long
      if len(self.miner) != 20:
          raise Exception("Miner field is not 20 bytes long")

      # Calculate the target value based on the difficulty
      target = 2 ** 256 // difficulty

      # Check if the block ID meets the difficulty requirement
      if int.from_bytes(self.block_id, byteorder='big') > target:
          raise Exception("Block ID does not meet the difficulty requirement")

      # Initialize an empty dictionary to store updated user states
      updated_user_states = {}

      # Iterate over each record in the block
      for record in self.records:

          # Get the previous state of the sender
          sender_state = previous_user_states[record.sender_hash]

          # Verify the record using the sender's previous state
          record.verify(sender_state.balance, sender_state.nonce)

          # Update the sender's state with new balance and nonce
          updated_user_states[record.sender_hash] = UserState(sender_state.balance - record.amount - record.fee, sender_state.nonce + 1)

          # Check if the recipient's state doesn't exist in updated_user_states
          if record.recipient_hash not in updated_user_states:

              # Create a new state for the recipient with the received amount and nonce 0
              updated_user_states[record.recipient_hash] = UserState(record.amount, 0)

          else:

              # Update the recipient's balance by adding the received amount
              updated_user_states[record.recipient_hash].balance += record.amount

      # Return the updated user states
      return updated_user_states

    # Declare the function to calculate the block ID
    def calculate_block_id(self):

      # Create a new SHA-256 hash object
      digest = hashes.Hash(hashes.SHA256())

      # Update the hash digest with the previous block's hash
      digest.update(self.previous)

      # Update the hash digest with the miner's information
      digest.update(self.miner)

      # Iterate over each record in the block
      for record in self.records:

          # Update the hash digest with the transaction ID of each record
          digest.update(record.txid)

      # Update the hash digest with the timestamp converted to bytes
      digest.update(self.timestamp.to_bytes(8, byteorder='little', signed=False))

      # Update the hash digest with the difficulty converted to bytes
      digest.update(self.difficulty.to_bytes(16, byteorder='little', signed=False))

      # Update the hash digest with the nonce converted to bytes
      digest.update(self.nonce.to_bytes(8, byteorder='little', signed=False))

      # Finalize the hash digest and return the resulting block ID
      return digest.finalize()



# Part 3: Mining


## Simple Implementation of Mining

Function mine_block takes the specified arguments and produces a block with a valid block_id that meets the proof of work criteria

In [55]:
# Import the time module for timestamp functionality
import time

# Define the mine_block function with block parameters
def mine_block(previous, height, miner, transactions, timestamp, difficulty):

    # Initialize the nonce value to 0
    nonce = 0

    # Start an infinite loop to find a valid block
    while True:

        # Create a new Block object with the current block parameters and nonce value
        block = Block(previous, height, miner, transactions, timestamp, difficulty, bytes(32), nonce)

        # Calculate the block ID using the calculate_block_id method of the Block object
        block_id = block.calculate_block_id()

        # Calculate the target value based on the difficulty
        target = 2 ** 256 // difficulty

        # Check if the block ID is less than or equal to the target value
        if int.from_bytes(block_id, byteorder='big') <= target:

            # If a valid block is found, return a new Block object with the found block ID and nonce value
            return Block(previous, height, miner, transactions, timestamp, difficulty, block_id, nonce)

        # Increment the nonce value for the next iteration of the loop
        nonce += 1


## Mine a Block

In [56]:
# Import the time function from the time module
from time import time

# Example usage of the mine_block function:

# Set the previous block ID as a 32-byte value (all zeros)
previous_block_id = bytes.fromhex('0000000000000000000000000000000000000000000000000000000000000000')

# Set the block height to 0
height = 0

# Set the miner's address as a 20-byte value
miner_address = bytes.fromhex('0123456789abcdef0123456789abcdef01234567')

# Initialize an empty list for transactions
transactions = []

# Get the current timestamp as an integer
timestamp = int(time())

# Set the mining difficulty
difficulty = 100_000

# Call the mine_block function to mine a new block
start_time = time()
mined_block = mine_block(previous_block_id, height, miner_address, transactions, timestamp, difficulty)
end_time = time()

print("\n-> Mined Block Details..:")
print("\nPrevious block ID.......:", mined_block.previous.hex())
print("Height..................:", mined_block.height)
print("Miner address...........:", mined_block.miner.hex())
print("Number of transactions..:", len(mined_block.records))
print("Timestamp...............:", mined_block.timestamp)
print("Difficulty..............:", mined_block.difficulty)
print("Block ID................:", mined_block.block_id.hex())
print("Nonce...................:", mined_block.nonce)
print(f"Mining time.............: {end_time - start_time:.2f} seconds")



-> Mined Block Details..:

Previous block ID.......: 0000000000000000000000000000000000000000000000000000000000000000
Height..................: 0
Miner address...........: 0123456789abcdef0123456789abcdef01234567
Number of transactions..: 0
Timestamp...............: 1711213303
Difficulty..............: 100000
Block ID................: 000045add63c6a12ba9e386df44c00d4c291d0ac07b3c8a6ff57e30a21232800
Nonce...................: 5687
Mining time.............: 0.11 seconds


## Optimized Implementation of Mining


Implement a parallel block mining function using multiprocessing in Python, which distributes the mining work among multiple processes to find a valid block in a blockchain more efficiently. The primary function, mine_block, spawns worker processes based on the number of available CPU cores, and each process independently tries to find a valid block by incrementing the nonce value and checking if the resulting block ID meets the difficulty target. The worker function, `mine_block_worker`, runs in a loop until a block is found, and shared variables are used for communication and coordination between the processes. By utilizing parallel processing, the code aims to speed up the mining process compared to a single-threaded approach.

In [57]:
# Import the multiprocessing module for parallel processing
import multiprocessing

# Import Value and Array classes for shared variables
from multiprocessing import Value, Array

# Import c_bool and c_ulong data types for shared variables
from ctypes import c_bool, c_ulong

# Define the worker function for mining a block
def mine_block_worker(previous, height, miner, transactions, timestamp, difficulty, block_found, nonce, block_id):

    # Continue mining until a block is found
    while not block_found.value:

        # Create a new Block object with the current nonce value
        block = Block(previous, height, miner, transactions, timestamp, difficulty, bytes(32), nonce.value)

        # Calculate the block ID candidate
        block_id_candidate = block.calculate_block_id()

        # Calculate the target value based on the difficulty
        target = 2 ** 256 // difficulty

        # Check if the block ID candidate meets the target
        if int.from_bytes(block_id_candidate, byteorder='big') <= target:

            # Acquire a lock on the block_found variable
            with block_found.get_lock():

                # Double-check if a block has not been found by another process
                if not block_found.value:

                    # Set block_found to True to indicate a block has been found
                    block_found.value = True

                    # Update the block_id array with the found block ID
                    block_id[:] = block_id_candidate

                    # Exit the worker function
                    return

        # Acquire a lock on the nonce variable
        with nonce.get_lock():

            # Increment the nonce value
            nonce.value += 1

# Define the main function for mining a block
def mine_block(previous, height, miner, transactions, timestamp, difficulty):

    # Create a shared boolean variable to indicate if a block has been found
    block_found = Value(c_bool, False)

    # Create a shared unsigned long variable to store the nonce value
    nonce = Value(c_ulong, 0)

    # Create a shared array to store the found block ID
    block_id = Array('B', 32)

    # Get the number of available CPU cores
    num_processes = multiprocessing.cpu_count()

    # Create a list to store the worker processes
    processes = []

    # Spawn worker processes based on the number of CPU cores
    for _ in range(num_processes):

        # Create a worker process
        process = multiprocessing.Process(target=mine_block_worker, args=(previous, height, miner, transactions, timestamp, difficulty, block_found, nonce, block_id))

        # Start the worker process
        process.start()

         # Add the process to the list of processes
        processes.append(process)

    # Wait for all worker processes to complete
    for process in processes:

        # Wait for the process to finish
        process.join()

    # Create and return a new Block object with the found block ID and nonce value
    return Block(previous, height, miner, transactions, timestamp, difficulty, bytes(block_id), nonce.value)
