# MSc Data Science and Artificial Intelligence
# DSM070 Blockchain Programming Coursework
# Zimcoin 3: The Blockchain

## Program: 50% - Essay: 50%

In [2]:
# import libraries
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import utils, ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding

import random
import unittest
from copy import deepcopy
from typing import Dict, List
from time import time

# 1. Transactions CW2

In [3]:
def hashSHA1(obj: bytes):
        digest = hashes.Hash(hashes.SHA1())
        digest.update(obj)
        return digest.finalize()

In [4]:
# 1.1 constructor
class Transaction:
    def __init__(self, sender_hash:bytes, recipient_hash:bytes, sender_public_key:bytes, amount:int, fee:int, nonce:int, signature: bytes, txid: bytes):
        self.sender_hash = sender_hash
        self.recipient_hash = recipient_hash
        self.sender_public_key = sender_public_key
        self.amount = amount
        self.fee = fee
        self.nonce = nonce
        self.signature = signature
        self.txid = txid
        self.__verified: bool = False
        
    
# 1.2 verify
    def verify(self, sender_balance:int, sender_previous_nonce:int):

        # check sender hash
        if len(self.sender_hash) != 20:
            raise Exception('Sender hash is not 20 bytes long')
        
        # check recipient hash
        if len(self.recipient_hash) != 20:
            raise Exception('Recipient hash is not 20 bytes long')
        
        # check sender hash
        if self.sender_hash != hashSHA1(self.sender_public_key):
            raise Exception("The sender hash is not equal to 'SHA1' hash of sender_public_key")
        
        # check the amount
        if self.amount < 1:
            raise Exception('Amount is small')
        if self.amount > sender_balance:
            raise Exception('Balance too small')
        if float(self.amount).is_integer() == False:
            raise Exception('The amount is not a whole number')

        # check the fee
        if self.fee < 0:
            raise Exception('Fee is small')
        if self.fee > self.amount:
            raise Exception('The fee is invalid')
        if float(self.fee).is_integer() == False:
            raise Exception('The fee is not a whole number')
        
        # check the nonce
        if (self.nonce != sender_previous_nonce + 1):
            raise Exception("Invalid nonce")

        if self.txid != self.calculate_txid():
            raise Exception('Incorrect txid')


      # validating the signature
        sender_public_key = serialization.load_der_public_key(self.sender_public_key)
        sender_public_key.verify(self.signature, self.calculate_signature_hash(), ec.ECDSA(utils.Prehashed(hashes.SHA256())))
        
        self.__verified = True
        
        return True

    
# 1.2.1 calculation of transaction ID
    def calculate_txid(self):
        digest = hashes.Hash(hashes.SHA256(),default_backend())
        digest.update(self.sender_hash)
        digest.update(self.recipient_hash)
        digest.update(self.sender_public_key)
        digest.update(self.amount.to_bytes(8, byteorder = 'little', signed = False))
        digest.update(self.fee.to_bytes(8, byteorder = 'little', signed = False))
        digest.update(self.nonce.to_bytes(8, byteorder = 'little', signed = False))
        digest.update(self.signature)

        return digest.finalize()

    
# 1.2.2 verification of the signature
    def calculate_signature_hash(self):
        digest = hashes.Hash(hashes.SHA256(),default_backend())
        digest.update(self.recipient_hash)
        digest.update(self.amount.to_bytes(8, byteorder = 'little', signed = False))
        digest.update(self.fee.to_bytes(8, byteorder = 'little', signed = False))
        digest.update(self.nonce.to_bytes(8, byteorder = 'little', signed = False))
        
        return digest.finalize()
    
       
# 2. The create_signed_transaction function
def create_signed_transaction(sender_private_key: ec.EllipticCurvePrivateKey, recipient_hash:bytes, amount:int, fee:int, nonce:int):

    # check the amount
    if amount < 1:
        raise Exception('Amount is small')
    if float(amount).is_integer() == False:
        raise Exception('The amount is not a whole number')

    # check the fee
    if fee < 0:
        raise Exception('Fee is small')
    if float(fee).is_integer() == False:
        raise Exception('The fee is not a whole number')

    # generate public key
    sender_public_key = sender_private_key.public_key().public_bytes(encoding = serialization.Encoding.DER, format = serialization.PublicFormat.SubjectPublicKeyInfo)
    sender_hash = hashSHA1(sender_public_key)
    
    # initiate the new transaction
    tx = Transaction(
        sender_hash = sender_hash,
        recipient_hash = recipient_hash,
        sender_public_key = sender_public_key,
        amount = amount,
        fee = fee,
        nonce = nonce,
        signature = None,
        txid = None)

    # sign the transaction
    tx.signature = sender_private_key.sign(tx.calculate_signature_hash(), ec.ECDSA(utils.Prehashed(hashes.SHA256())))

    # initiate the transaction id
    tx.txid = tx.calculate_txid()

    return tx

# 2. Blocks CW3

In [5]:
class UserState():
    def __init__(self, balance, nonce):
        self.nonce = nonce
        self.balance = balance

# zimcoin reward for mining the block
mining_reward = 10000

class Block:
    def __init__(self, previous: bytes, height: int, miner: bytes, transactions: List[Transaction], timestamp: int, difficulty: int, block_id: bytes, nonce: int):  
        self.previous = previous
        self.height = height
        self.miner = miner
        self.transactions = transactions
        self.timestamp = timestamp
        self.difficulty = difficulty
        self.block_id = block_id
        self.nonce = nonce

        
    # verify the block transactions and track the changes in the UserState
    def verify_and_get_changes(self, difficulty: int, previous_user_states: Dict[str, UserState]):
        
        self.verify_PoW()
        
        if difficulty != self.difficulty:
            raise Exception('Incorrect difficulty')
            
        self.block_id_checker()
              
        if len(self.miner) != 20:
            raise Exception('Miner length is incorrect')
        
        if len(self.transactions) > 25:
            raise Exception('transactions length is incorrect')

        dic_user_states = deepcopy(previous_user_states)
        
        total_fee = 0        
        for transaction in self.transactions:            
            if(transaction.sender_hash in dic_user_states and transaction.recipient_hash in dic_user_states):
                pass
            elif transaction.sender_hash not in dic_user_states:
                dic_user_states[transaction.sender_hash] = UserState(0, -1)
            else:
                dic_user_states[transaction.recipient_hash] = UserState(0, -1)
                
            transaction.verify(dic_user_states[transaction.sender_hash].balance, dic_user_states[transaction.sender_hash].nonce)

            # update the balance of the sender and the receiver
            dic_user_states[transaction.sender_hash].balance = dic_user_states[transaction.sender_hash].balance - transaction.amount  
            dic_user_states[transaction.recipient_hash].balance = dic_user_states[transaction.recipient_hash].balance + transaction.amount - transaction.fee
             
            # update the nonce of the sender
            dic_user_states[transaction.sender_hash].nonce = dic_user_states[transaction.sender_hash].nonce + 1
            
            total_fee = total_fee + transaction.fee
        
        # update miners with the reward for mining the block
        try:
            dic_user_states[self.miner].balance = dic_user_states[self.miner].balance + total_fee + mining_reward
        except KeyError:
            dic_user_states[self.miner] = UserState(0, -1)
            dic_user_states[self.miner].balance = total_fee + mining_reward        
        
        return dic_user_states

    
    # small block id to match the difficulty of the block        
    def verify_PoW(self):
        target = 2 ** 256 // self.difficulty
        endian_int = int.from_bytes(self.block_id, byteorder = 'big', signed = False)
            
        if endian_int > target:
            raise Exception('Invalid proof of work')
    
   
    # calculate the blok id of the block to verify it is correct
    def calculate_block_id(self):
        digest = hashes.Hash(hashes.SHA256(), default_backend())
        digest.update(self.previous)
        digest.update(self.miner)        

        for transaction in self.transactions:
            digest.update(transaction.txid)
                
        digest.update(self.timestamp.to_bytes(8, byteorder = 'little', signed = False))
        digest.update(self.difficulty.to_bytes(16, byteorder = 'little', signed = False))
        digest.update(self.nonce.to_bytes(8, byteorder = 'little', signed = False))
                
        return digest.finalize()

    # verify the block id matching
    def block_id_checker(self):
        if self.block_id != self.calculate_block_id():
            raise Exception('Block id does not match')
            
            
    def get_changes_for_undo(self, user_states_after: Dict[str, UserState]):
        dic_user_states = deepcopy(user_states_after)
        
        total_fee = 0
        for transaction in self.transactions:            
            if(transaction.sender_hash in dic_user_states and transaction.recipient_hash in dic_user_states):
                pass
            elif transaction.sender_hash not in dic_user_states:
                dic_user_states[transaction.sender_hash] = UserState(0, -1)
            else:
                dic_user_states[transaction.recipient_hash] = UserState(0, -1)
            
            # update the balance of the sender and the receiver
            dic_user_states[transaction.sender_hash].balance = dic_user_states[transaction.sender_hash].balance + transaction.amount  
            dic_user_states[transaction.recipient_hash].balance = dic_user_states[transaction.recipient_hash].balance - transaction.amount + transaction.fee
             
            # update the nonce of the sender
            dic_user_states[transaction.sender_hash].nonce = dic_user_states[transaction.sender_hash].nonce - 1
            
            total_fee = total_fee + transaction.fee
 
        # update miners with the reward for mining the block
        dic_user_states[self.miner].balance = dic_user_states[self.miner].balance - (total_fee + mining_reward)

        return dic_user_states

In [6]:
# mining the block
def mine_block(previous, height, miner, transactions, timestamp, difficulty, cutofftime):
    
    #find nonce
    hasher1 = hashes.Hash(hashes.SHA256(),default_backend())
    hasher1.update(previous)
    hasher1.update(miner)
    
    for transaction in transactions:
        hasher1.update(transaction.txid)
    
    hasher1.update(timestamp.to_bytes(8, byteorder = 'little', signed = False))
    hasher1.update(difficulty.to_bytes(16, byteorder = 'little', signed = False))
        
    # define the target
    target = 2**256 // difficulty
    
    nonce = 0
    start = time()
    
    while time() < cutofftime:
        nonce = nonce + 1
        hasher2 = hasher1.copy()

        nonce_bytes = int(nonce).to_bytes(8, byteorder = 'little', signed = False)
        hasher2.update(nonce_bytes)
        hasher2_hash = hasher2.finalize()
        hasher2_int = int.from_bytes(hasher2_hash, byteorder = 'big')
        if hasher2_int <= target:
            break

    hasher1.update(nonce.to_bytes(8, byteorder = 'little', signed = False))
    block_id = hasher1.finalize() 

    end = time()
    # print("Took %s seconds to try %s attempts, rate = %s" % (end - start, nonce, nonce / (1e-5 + end - start)))

    block = Block(previous = previous, height = height, miner = miner, transactions = transactions,
                timestamp = timestamp, difficulty = difficulty, block_id = block_id, nonce = nonce)
    
    return block

# 3. BlockchainState CW4 

In [10]:
difficulty_period = 10

class BlockchainState():
    def __init__(self, longest_chain: List[Block], user_states: Dict[str, UserState], total_difficulty: int):
        self.longest_chain = longest_chain
        self.user_states = user_states
        self.total_difficulty = total_difficulty
 
    def calculate_difficulty(self):
        if len(self.longest_chain) <= difficulty_period:
            default_difficulty = 1000
            return default_difficulty

        total_difficulty_for_period = sum([block.difficulty for block in self.longest_chain[-10:]])
        time_for_period = max(self.longest_chain[-1].timestamp - self.longest_chain[-11].timestamp, 1)           

        return (total_difficulty_for_period // time_for_period) * 120

    
    def verify_and_apply_block(self, block):

        if block.height != len(self.longest_chain):
            raise Exception('Height is incorrect')
        
        # verify the last variable in the block to be either 0 or same as the previous block id
        if self.longest_chain == []:
            block_0 = 0
            assert block.previous == block_0.to_bytes(32, byteorder = 'little', signed = False), "previous block id"         
        else:         
            if block.previous != self.longest_chain[-1].block_id:
                raise Exception("previous block id")
            
            # verify the block timestamp to at least be higher the its previous one
            if block.timestamp < self.longest_chain[-1].timestamp:
                raise Exception("Timing is incorrect")
        
        new_states = block.verify_and_get_changes(self.calculate_difficulty(), self.user_states)
        
        # implement the changes
        self.longest_chain.append(block)
        self.total_difficulty = self.total_difficulty + block.difficulty
        self.user_states.update(new_states)


    def undo_last_block(self):
        
        removed_final_block = self.longest_chain.pop()
        self.total_difficulty = self.total_difficulty - removed_final_block.difficulty
        self.user_states.update(removed_final_block.get_changes_for_undo(self.user_states))

        
def verify_reorg(old_state: BlockchainState, new_branch: List[Block]):

    # make a copy for the old state to prevent it from any invalid changes
    new_state = BlockchainState(deepcopy(old_state.longest_chain), deepcopy(old_state.user_states), old_state.total_difficulty)
    

    height = new_branch[0].height
    
    
    while(new_state.longest_chain[-1].height >= height):
        new_state.undo_last_block()

    for block in new_branch:
        new_state.verify_and_apply_block(block)
        
    if new_state.total_difficulty <= old_state.total_difficulty:
        raise Exception("Incorrect total difficulty")
    
    return new_state

# 3. BlockchainState Test 

In [8]:
def calculate_sha1_hash(public_key):
    digest = hashes.Hash(hashes.SHA1())
    digest.update(public_key)
    return digest.finalize()


def private_key_to_public_key(private_key):
    return private_key.public_key().public_bytes(encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)


ALICE_KEY = ec.generate_private_key(ec.SECP256K1)
ALICE_ADDRESS = calculate_sha1_hash(private_key_to_public_key(ALICE_KEY))

BOB_KEY = ec.generate_private_key(ec.SECP256K1)
BOB_ADDRESS = calculate_sha1_hash(private_key_to_public_key(BOB_KEY))


class BlockchainStateTest(unittest.TestCase):
    def test_difficulty_calculation(self):
        state = BlockchainState([], dict(), 0)
        previous = bytes([0] * 32)
        for height, (timestamp, difficulty) in enumerate([
            (0, 1000), (34, 1000), (60, 1000), (60, 1000), (100, 1000), (500, 1000), (600, 1000), (800, 1000),
            (805, 1000), (805, 1000), (900, 1000), (1500, 1320), (1600, 840)]):
            block = mine_block(previous, height, ALICE_ADDRESS, [], timestamp, difficulty, time() + 100)
            state.verify_and_apply_block(block)
            previous = block.block_id

        block = mine_block(previous, 13, ALICE_ADDRESS, [], 1600, 840, time() + 100)
        with self.assertRaisesRegex(Exception, "Incorrect difficulty"):
            state.verify_and_apply_block(block)

    def test_undo(self):
        state = BlockchainState([], dict(), 0)
        previous = bytes([0] * 32)
        total_difficulty = 0
        for height in range(15):
            block = mine_block(previous, height, ALICE_ADDRESS, [], 120 * height, state.calculate_difficulty(),
                               time() + 100)
            state.verify_and_apply_block(block)
            total_difficulty += block.difficulty
            previous = block.block_id

        transactions = [create_signed_transaction(ALICE_KEY, BOB_ADDRESS, 3000, 25, 0)]
        block = mine_block(previous, 15, BOB_ADDRESS, transactions, 120 * 15, state.calculate_difficulty(),
                           time() + 150)
        state.verify_and_apply_block(block)
        total_difficulty += block.difficulty
        previous = block.block_id

        transactions = [
            create_signed_transaction(BOB_KEY, ALICE_ADDRESS, 1000, 50, 0),
            create_signed_transaction(ALICE_KEY, BOB_ADDRESS, 100, 50, 1)]
        block = mine_block(previous, 16, BOB_ADDRESS, transactions, 120 * 16, state.calculate_difficulty(),
                           time() + 150)
        state.verify_and_apply_block(block)
        previous = block.block_id

        block = mine_block(previous, 17, BOB_ADDRESS, [], 120 * 17, state.calculate_difficulty(), time() + 150)
        state.verify_and_apply_block(block)
        previous = block.block_id

        assert len(state.longest_chain) == 18
        assert state.user_states[ALICE_ADDRESS].balance == 147_850
        assert state.user_states[ALICE_ADDRESS].nonce == 1
        assert state.user_states[BOB_ADDRESS].balance == 32_150
        assert state.user_states[BOB_ADDRESS].nonce == 0

        state.undo_last_block()
        state.undo_last_block()

        assert len(state.longest_chain) == 16
        assert state.user_states[ALICE_ADDRESS].balance == 147_000
        assert state.user_states[ALICE_ADDRESS].nonce == 0
        assert state.user_states[BOB_ADDRESS].balance == 13_000
        assert state.user_states[BOB_ADDRESS].nonce == -1
        assert state.total_difficulty == total_difficulty

    def test_previous_validation(self):
        state = BlockchainState([], dict(), 0)
        block = mine_block(bytes([1] * 32), 0, ALICE_ADDRESS, [], 0, 1000, time() + 150)

        with self.assertRaisesRegex(Exception, "previous block id"):
            state.verify_and_apply_block(block)

        block = mine_block(bytes([0] * 32), 0, ALICE_ADDRESS, [], 0, 1000, time() + 150)
        state.verify_and_apply_block(block)

        block = mine_block(bytes([0] * 32), 1, ALICE_ADDRESS, [], 0, 1000, time() + 150)
        with self.assertRaisesRegex(Exception, "previous block id"):
            state.verify_and_apply_block(block)

        block = mine_block(bytes([1] * 32), 1, ALICE_ADDRESS, [], 0, 1000, time() + 150)
        with self.assertRaisesRegex(Exception, "previous block id"):
            state.verify_and_apply_block(block)

    def test_difficulty_with_zero_time(self):
        state = BlockchainState([], dict(), 0)
        previous = bytes([0] * 32)
        for height in range(11):
            block = mine_block(previous, height, ALICE_ADDRESS, [], 0, 1000, time() + 100)
            state.verify_and_apply_block(block)
            previous = block.block_id

        assert state.calculate_difficulty() == 1_200_000

    def test_reorg(self):
        state = BlockchainState([], dict(), 0)
        previous = bytes([0] * 32)
        for height in range(15):
            block = mine_block(previous, height, ALICE_ADDRESS, [], 120 * height, state.calculate_difficulty(),
                               time() + 100)
            state.verify_and_apply_block(block)
            previous = block.block_id

        previous = state.longest_chain[7].block_id
        blocks = []
        for height in range(8, 15):
            block = mine_block(previous, height, BOB_ADDRESS, [], 120 * height, state.longest_chain[height].difficulty,
                               time() + 100)
            blocks.append(block)
            previous = block.block_id

        with self.assertRaisesRegex(Exception, "total difficulty"):
            verify_reorg(state, blocks)

        assert state.user_states[ALICE_ADDRESS].balance == 150_000
        assert BOB_ADDRESS not in state.user_states

        block = mine_block(previous, 15, BOB_ADDRESS, [], 120 * 15, state.calculate_difficulty(), time() + 100)
        blocks.append(block)
        new_state = verify_reorg(state, blocks)

        assert state.user_states[ALICE_ADDRESS].balance == 150_000
        assert BOB_ADDRESS not in state.user_states

        assert new_state.user_states[ALICE_ADDRESS].balance == 80_000
        assert new_state.user_states[BOB_ADDRESS].balance == 80_000


if __name__ == '__main__':
    unittest.main(argv=[''], verbosity = 2, exit = False)

test_difficulty_calculation (__main__.BlockchainStateTest) ... ok
test_difficulty_with_zero_time (__main__.BlockchainStateTest) ... ok
test_previous_validation (__main__.BlockchainStateTest) ... ok
test_reorg (__main__.BlockchainStateTest) ... ok
test_undo (__main__.BlockchainStateTest) ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.717s

OK


# Mining Step

### miner address: 1c3c011f4bba7fa27d8108d35845b2e81f3e73ee

I stopped mining in 41 coin to give my colleagues a chance to mine.

In [9]:
%run main.py