### Libraries

In [None]:
！ pip install -r requirements.txt

In [6]:
# add all used libraries here
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
import hashlib
import json
import time
import random

### 4.1 Transaction Generation

In [7]:
#create a blockchain account
def create_account():
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    public_key = private_key.public_key()
    #generate private key and public key with RAS

    #serialize private key and public key to PEM format
    private_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption()
    )

    public_pem = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )

    return private_pem, public_pem

#generate single-input single-output (SISO) transaction
def create_transaction(sender_private_key_pem, sender_public_key_pem, receiver_public_key_pem, amount):
    sender_private_key = serialization.load_pem_private_key(sender_private_key_pem, password=None)
    #load sender's private key

    #transaction data
    data = {
        'amount': amount,
        'sender': sender_public_key_pem.decode('utf-8'),
        'receiver': receiver_public_key_pem.decode('utf-8')
    }

    #calculate the hash of transaction data as transaction ID
    transaction_id = hashlib.sha256(json.dumps(data).encode('utf-8')).hexdigest()

    #sign
    signature = sender_private_key.sign(
        json.dumps(data).encode('utf-8'),
        padding.PSS(
            mgf=padding.MGF1(hashes.SHA256()),
            salt_length=padding.PSS.MAX_LENGTH
        ),
        hashes.SHA256()
    )

    #finish create
    transaction = {
        'transaction_id': transaction_id,
        'data': data,
        'signature': signature.hex()
    }

    return transaction

### 4.2 Verifiable Merkle Tree

In [8]:
def sha256_hash(data):
    return hashlib.sha256(data.encode('utf-8')).hexdigest()

def build_merkle_tree(transactions):
    if len(transactions) == 1:
        return transactions[0]

    new_level = []
    for i in range(0, len(transactions) - 1, 2):
        # Use transaction_id for hashing
        combined = transactions[i]['transaction_id'] + transactions[i + 1]['transaction_id']
        new_hash = sha256_hash(combined)
        new_level.append({'transaction_id': new_hash})

    if len(transactions) % 2 != 0:
        new_level.append({'transaction_id': transactions[-1]['transaction_id']})

    return build_merkle_tree(new_level)

def get_merkle_root(transactions):
    if len(transactions) == 0:
        return ""
    return build_merkle_tree(transactions)['transaction_id']

def verify_merkle_proof(transaction, proof, merkle_root):
    current_hash = transaction['transaction_id']
    for p in proof:
        if p['position'] == 'left':
            current_hash = sha256_hash(p['hash'] + current_hash)
        else:
            current_hash = sha256_hash(current_hash + p['hash'])
    return current_hash == merkle_root

### 4.3 Construction of Blockchain

In [9]:
def create_block(previous_hash=None, transactions=[], target=0, nonce=0):

    if previous_hash is None: # If there is no previous hash
        previous_hash = '0' * 64

    merkle_root = '0' * 64 # If there are no trasactions in the current block

    if len(transactions): # If there are trasactions in the current block
        merkle_root = get_merkle_root(transactions)

    timestamp = hex(int(time.time())).replace('0x', '').replace('-', '').zfill(8)

    target = hex(target).replace('0x', '').replace('-', '').zfill(8)

    nonce = hex(nonce).replace('0x', '').replace('-', '').zfill(8)

    header = {
        'previous_hash': previous_hash, # 32 bytes
        'merkle_root': merkle_root, # 32 bytes
        'timestamp': timestamp, # 4 bytes
        'target': target, # 4 bytes
        'nonce': nonce # 4 bytes
    }

    number_of_transactions = hex(len(transactions)).replace('0x', '').replace('-', '').zfill(2)

    block = {
        'header': header,
        'number_of_transactions': number_of_transactions, # 1 byte
        'transactions': transactions
    }

    block['header']['current_hash'] = sha256_hash(str(header))

    return block

In [10]:
def print_block(block):

    print("Block Header:\n")
    for k, v in block['header'].items():
        print(f"{k}: {v}")
    print()

    print("\nNumber of Transactions:\n")
    print(f"number_of_transactions: {block['number_of_transactions']}")
    print()

    print("\nTransactions in the Block:\n")
    for transaction in block['transactions']:
        for k, v in transaction.items():
            print(f"{k}: {v}")
        print()

### 4.4 Mining a block and 4.5 Integrity Verification

In [11]:
class BlockChain:

    def __init__(self):
        self.chain = []
        self.target = 5

    def mine_block(self, transactions):
        nonce = 0
        start = time.time()
        block = create_block(self.chain[-1]['header']['current_hash'] if self.chain else None, transactions=transactions, target=self.target, nonce=nonce)
        while block['header']['current_hash'][0:self.target] != '0' * self.target:
            nonce += 1
            block = create_block(self.chain[-1]['header']['current_hash'] if self.chain else None, transactions=transactions, target=self.target, nonce=nonce)
        used_time = time.time() - start
        return block, used_time

    def change_target(self, time):
      # arrage target difficulties according to the time used in the mining block
        if time >= 15:
            self.target = random.randint(1, self.target)
        elif time < 5:
            self.target = random.randint(self.target, 5)

    def add_block2chain(self, transactions, dynamic=False):
        mined_block, time = self.mine_block(transactions)
        self.chain.append(mined_block)
        if dynamic:
          self.change_target(time)

    def print_chain(self):
        print(f'There are {len(self.chain)} blocks in the chain. ')
        print(f'Here is the details : ')
        for i in range (len(self.chain)):
            print(f'Block {i}')
            print_block(self.chain[i])

### 4.5 Integrity Verification ###

    def is_chain_valid(self):
        for i in range(1, len(self.chain)):
            current_block = self.chain[i]
            prev_block = self.chain[i - 1]

            # Verify block hash link
            if current_block['header']['previous_hash'] != prev_block['header']['current_hash']:
                print(f"Block {i} has invalid previous hash linkage")
                return False

            # Verify the current hash
            raw_header = {
                'previous_hash': current_block['header']['previous_hash'],
                'merkle_root': current_block['header']['merkle_root'],
                'timestamp': current_block['header']['timestamp'],
                'target': current_block['header']['target'],
                'nonce': current_block['header']['nonce']
            }
            computed_hash = sha256_hash(str(raw_header))
            if computed_hash != current_block['header']['current_hash']:
                print(f"Block {i} has invalid current hash")
                return False
            target = int(current_block['header']['target'], base=16)
            if target and current_block['header']['current_hash'][0:target] != '0' * target:
                print(f"Block {i} has invalid current hash")
                return False

            # Verify Merkle tree root
            current_merkle = get_merkle_root(current_block['transactions'])
            if current_merkle != current_block['header']['merkle_root']:
                print(f"Block {i} has invalid Merkle root")
                return False

            # Verify the number of transactions
            if len(current_block['transactions']) != int(current_block['number_of_transactions'], base=16):
                print(f"Block {i} has invalid number of transactions")
                return False

            # Verify transaction signature
            for tx in current_block['transactions']:
                try:
                    pub_key = serialization.load_pem_public_key(
                        tx['data']['sender'].encode('utf-8')
                    )

                    # Construct signature (must be exactly the same as when signing)
                    data_str = json.dumps(tx['data']).encode('utf-8')  # Maintain consistency in sorting and encoding
                    signature = bytes.fromhex(tx['signature'])

                    pub_key.verify(
                        signature,
                        data_str,
                        padding.PSS(
                            mgf=padding.MGF1(hashes.SHA256()),
                            salt_length=padding.PSS.MAX_LENGTH
                        ),
                        hashes.SHA256()
                    )
                except Exception as e:
                    print(f"Transaction {tx['transaction_id']} verification failed: {str(e)}")
                    return False
        return True


### 4.5.1 Integrity Verification


In [12]:
# test codes

if __name__ == "__main__":
    # Initialize blockchain
    bc = BlockChain()

    # ================ Generate 3 different blocks ================
    for _ in range(3):
        # Generate new transaction every time
        new_transactions = []
        for __ in range(random.randint(1,4)):  # Random 2 - 4 transactions
            sk, pk = create_account()
            rk, rp = create_account()
            tx = create_transaction(sk, pk, rp, random.randint(1,100))  # Random amount
            new_transactions.append(tx)
        bc.add_block2chain(new_transactions)  # Add new block

    # ================ Test 1: Original chain verification ================
    print("=== Test 1 - Original chain verification ===")
    print("The validity of blockchain:", bc.is_chain_valid())  # Should return True

    # ================ Test 2: Hash value tampering test ================
    print("\n=== Test 2 - Current Hash tampering test ===")
    original_hash = bc.chain[1]['header']['current_hash']
    bc.chain[1]['header']['current_hash'] = "0000fake_hash"  # tampering hash
    print("Validity after tampering:", bc.is_chain_valid())  # Should print "invalid current hash"
    bc.chain[1]['header']['current_hash'] = original_hash  # Recover the current hash

    # ================ Test 3: Merkle tree root tampering test ================
    print("\n=== Test 3 - Merkle tree root tampering test ===")
    target_block_idx = 1
    # Verify if there are transactions in the target block
    assert len(bc.chain[target_block_idx]['transactions']) > 0, "Must select a block with transactions"
    original_merkle = bc.chain[target_block_idx]['header']['merkle_root']
    bc.chain[target_block_idx]['header']['merkle_root'] = "f"*64
    print("Validity after tampering:", bc.is_chain_valid())  # Should print "invalid Merkle root"
    bc.chain[target_block_idx]['header']['merkle_root'] = original_merkle

    # ================ Test 4: Transaction Signature Tampering Test ================
    print("\n=== Test 4 - Transaction Signature Tampering Test ===")
    tampered_block = bc.chain[2]
    tampered_tx = tampered_block['transactions'][0]
    original_amount = tampered_tx['data']['amount']
    tampered_tx['data']['amount'] = 999  # Modifying the amount and damaging the signature
    print("Validity after tampering:", bc.is_chain_valid())  # Should print "Verification failed"
    tampered_tx['data']['amount'] = original_amount  # Recover the amount

    # ================ Test 5: Recovery verification ================
    print("\n=== Test 5 - Recovery verification ===")
    print("Validity after recovery:", bc.is_chain_valid())  # Should return True

=== Test 1 - Original chain verification ===
The validity of blockchain: True

=== Test 2 - Current Hash tampering test ===
Block 1 has invalid current hash
Validity after tampering: False

=== Test 3 - Merkle tree root tampering test ===
Block 1 has invalid current hash
Validity after tampering: False

=== Test 4 - Transaction Signature Tampering Test ===
Transaction 397e9710a152e4b20a22d12aaa634a0901b80216bed8f1815162cdef95d07ada verification failed: 
Validity after tampering: False

=== Test 5 - Recovery verification ===
Validity after recovery: True


###4.5.2 System Verification