In [1]:
from abc import ABC, abstractmethod
import time
import hashlib

from pprint import pprint

### Main functions

In [2]:
def get_current_time() -> float:
    return time.time()


def compute_hash(string: str) -> str:
    b: bytes = string.encode()
    return hashlib.sha256(b).hexdigest()


def get_merkle_hash(arr: list[str]) -> str:
    if arr == []:
        return ""

    result_arr = arr.copy()

    for i in range(len(arr)):
        result_arr[i] = compute_hash(result_arr[i])

    while len(result_arr) != 1:
        new_arr = []
        for i in range(0, len(result_arr), 2):
            if i == len(result_arr) - 1:
                h = compute_hash(result_arr[i])
            else:
                h = compute_hash(result_arr[i] + result_arr[i + 1])
            new_arr.append(h)
        result_arr = new_arr.copy()
    
    return result_arr[0]

### Account, Transaction and Block

In [3]:
class Account:
    def __init__(self, mnemonic: str = "", private_key: str = "", public_key: str = ""):
        if public_key:
            self.public_key = public_key
        else:
            self.public_key = compute_hash(f"{mnemonic}{private_key}")

    def __str__(self) -> str:
        return str(self.public_key)


class Transaction:
    """PHC - phils coin"""

    def __init__(
        self,
        sender_private_key: str,
        sender_public_key: str,
        reciever_public_key: str,
        PHC: float,
        sender_PHC_total: float,
        reciever_PHC_total: float,
    ):
        self.sender = Account(public_key=sender_public_key)
        self.reciever = Account(public_key=reciever_public_key)
        self.PHC = PHC
        self.sender_PHC_total = sender_PHC_total
        self.reciever_PHC_total = reciever_PHC_total
        self.timestamp = get_current_time()

        s = sender_public_key + reciever_public_key + str(self.timestamp) + sender_private_key
        signed_hash = compute_hash(s)
        self.hash = signed_hash

    def __str__(self) -> str:
        return str(self.hash)

    def is_valid(self, sender_private_key: str) -> bool:
        s = self.sender.public_key + self.reciever.public_key + str(self.timestamp) + sender_private_key
        signed_hash = compute_hash(s)
        if self.hash == signed_hash:
            return True
        return False


class Block:
    def __init__(self, index: int, previous_hash: str):
        self.index = index
        self.previous_hash = previous_hash
        self.transactions: list[Transaction] = []
        self.accounts: list[Account] = []
        self.timestamp = get_current_time()

    def __str__(self) -> str:
        return str(self.get_hash())

    def set_hash(self):
        self.hash = self.get_hash()

    def get_hash(self) -> str:
        transactions_hash = get_merkle_hash([t.hash for t in self.transactions])
        s = str(self.index) + self.previous_hash + transactions_hash + str(self.timestamp)
        return compute_hash(s)

### BaseABC and Base

Lets create an abstract class (BaseABC) that will share methods between children.

Lets create Base class that will contain the chain of blocks.

In [4]:
class BaseABC(ABC):
    @property
    @abstractmethod
    def current_block(self) -> Block:
        pass

    @property
    @abstractmethod
    def blocks_count(self) -> int:
        pass

    @abstractmethod
    def _create_block(self, initial: bool) -> Block:
        pass

    @abstractmethod
    def _append_block(self, block: Block):
        pass

    @abstractmethod
    def login(self, mnemonic: str, private_key: str) -> Account | None:
        pass

    @abstractmethod
    def get_account_by_public_key(self, public_key: str) -> Account | None:
        pass

    @abstractmethod
    def get_user_PHC_total(self, public_key: str) -> float:
        pass


class Base(BaseABC):
    MAX_TRANSACTIONS_SIZE_IN_BLOCK = 16

    def __init__(self, name: str) -> None:
        self.name = name
        self.chain: list[Block] = []
        self._append_block(self._create_block(initial=True))

    def __str__(self) -> str:
        return str(self.name)

### Managers

Lets create blockchain managers

In [5]:
class AccountManager(Base):
    def get_account_by_public_key(self, public_key: str) -> Account | None:
        for block in self.chain:
            for account in block.accounts:
                if public_key == account.public_key:
                    return account
        return

    def get_user_PHC_total(self, public_key: str) -> float:
        for block in self.chain[::-1]:
            for transaction in block.transactions[::-1]:
                if transaction.sender.public_key == public_key:
                    return transaction.sender_PHC_total
                elif transaction.reciever.public_key == public_key:
                    return transaction.reciever_PHC_total
        return 0.0

    def login(self, mnemonic: str, private_key: str) -> Account | None:
        public_key = Account(mnemonic=mnemonic, private_key=private_key).public_key
        if (account := self.get_account_by_public_key(public_key=public_key)) is not None:
            return account
        return

    def add_account(self, mnemonic: str, private_key: str) -> dict[str, str]:
        account = Account(mnemonic=mnemonic, private_key=private_key)
        if self.get_account_by_public_key(public_key=account.public_key):
            return {
                "error": "Please use another mnemonic. You have a hash collision."
            }
        self.current_block.accounts.append(account)
        return {
            "success": "Account has been created.",
            "public_key": account.public_key,
        }


class BlockManager(Base):
    @property
    def current_block(self) -> Block:
        return self.chain[-1]

    @property
    def blocks_count(self) -> int:
        return len(self.chain)

    def _create_block(self, initial: bool) -> Block:
        if initial:
            index = 0
            previous_hash = ""
        else:
            index = self.current_block.index + 1
            self.current_block.set_hash()
            previous_hash = self.current_block.hash

        return Block(
            index=index,
            previous_hash=previous_hash,
        )


class TransactionManager(Base):
    def _append_transaction(self, transaction: Transaction):
        self.current_block.transactions.append(transaction)

    def add_transaction(
        self,
        sender_mnemonic: str,
        sender_private_key: str,
        reciever_public_key: str,
        PHC: float,
    ) -> Transaction | dict[str, str]:
        PHC = float(PHC)

        sender = self.login(mnemonic=sender_mnemonic, private_key=sender_private_key)
        if sender is None:
            return {"error": "Can not login."}

        reciever = self.get_account_by_public_key(public_key=reciever_public_key)
        if reciever is None:
            return {"error": "Can not find reciever account."}

        sender_PHC_total = self.get_user_PHC_total(public_key=sender.public_key)
        reciever_PHC_total = self.get_user_PHC_total(public_key=reciever.public_key)

        if sender_PHC_total - PHC < 0:
            return {"error": "You do not have enough PHC."}

        transaction = Transaction(
            sender_private_key=sender_private_key,
            sender_public_key=sender.public_key,
            reciever_public_key=reciever.public_key,
            PHC=PHC,
            sender_PHC_total=sender_PHC_total - PHC,
            reciever_PHC_total=reciever_PHC_total + PHC,
        )

        self._append_transaction(transaction=transaction)
        if (
            len(self.current_block.transactions)
            >= self.MAX_TRANSACTIONS_SIZE_IN_BLOCK
        ):
            block = self._create_block(initial=False)
            self._append_block(block=block)
        return transaction


class ChainManager(Base):
    def _append_block(self, block: Block):
        self.chain.append(block)

    def is_valid(self) -> tuple[bool, int]:
        for i in range(self.blocks_count - 1):
            if self.chain[i].hash != self.chain[i].get_hash():
                return False, i

            if i > 0:
                if self.chain[i - 1].hash != self.chain[i].previous_hash:
                    return False, i
        return True, -1

In [6]:
class Blockchain(AccountManager, BlockManager, TransactionManager, ChainManager):
    def __init__(self, name: str, **kwargs):
        super().__init__(name=name, **kwargs)

### Initialize blockchain

In [7]:
sender_mnemonic = "mnemonic"
sender_private_key = "private_key"
reciever_mnemonic = "mnemonic1"
reciever_private_key = "private_key"

blockchain = Blockchain(name="Phils_blockchain")
print(blockchain)
print()

pprint(vars(blockchain))
print()

pprint(vars(blockchain.chain[0]))

Phils_blockchain

{'chain': [<__main__.Block object at 0x10f4fba00>], 'name': 'Phils_blockchain'}

{'accounts': [],
 'index': 0,
 'previous_hash': '',
 'timestamp': 1707251644.011664,
 'transactions': []}


### Create user

In [8]:
sender = blockchain.add_account(mnemonic=sender_mnemonic, private_key=sender_private_key)
print(f"{sender = }\n")

# Get users total coins
sender_public_key = sender.get("public_key")
sender_PHC_total = blockchain.get_user_PHC_total(public_key=sender_public_key)
print(f"{sender_PHC_total = }\n")

# Try to create an account that has the same mnemonic and private_key like user 1
sender_error = blockchain.add_account(
    mnemonic=sender_mnemonic, private_key=sender_private_key
)
print(f"{sender_error = }\n")

# Create second account
reciever = blockchain.add_account(
    mnemonic=reciever_mnemonic, private_key=reciever_private_key
)
print(f"{reciever = }")

sender = {'success': 'Account has been created.', 'public_key': '938b4cb077d33740fd4f7ec42a163c9746aa54ae80a0bf0322cb653cd8f263ec'}

sender_PHC_total = 0.0

sender_error = {'error': 'Please use another mnemonic. You have a hash collision.'}

reciever = {'success': 'Account has been created.', 'public_key': 'e97d2f7b6da84a84692338a2ee49533c32bd861d2f3255b9d5eb7119abed2cee'}


### Create our first transaction

In [26]:
reciever_public_key = reciever.get("public_key")

transaction = blockchain.add_transaction(
    sender_mnemonic=sender_mnemonic,
    sender_private_key=sender_private_key,
    reciever_public_key=reciever_public_key,
    PHC=0,
)

print(transaction)
print()

pprint(transaction.__dict__)
print()

# See that our transaction is valid
print(transaction.is_valid(sender_private_key=sender_private_key))

1af75cc345abd6a3fbe252d89abadda3d5d359d132d7b97741ddc21d22fb60e9

{'PHC': 0.0,
 'hash': '1af75cc345abd6a3fbe252d89abadda3d5d359d132d7b97741ddc21d22fb60e9',
 'reciever': <__main__.Account object at 0x10f4b56c0>,
 'reciever_PHC_total': 0.0,
 'sender': <__main__.Account object at 0x10f4b7a60>,
 'sender_PHC_total': 0.0,
 'timestamp': 1707251844.11622}

True


In [10]:
for _ in range(100):
    blockchain.add_transaction(
        sender_mnemonic=sender_mnemonic,
        sender_private_key=sender_private_key,
        reciever_public_key=reciever_public_key,
        PHC=0,
    )

pprint(blockchain.chain[0].__dict__)
print()

print(blockchain.is_valid())

{'accounts': [<__main__.Account object at 0x10f4fa410>,
              <__main__.Account object at 0x10f492620>],
 'hash': '434c0494dee199a66fdd14386371097c75fc308b14d24c3e9843bf3dde99e775',
 'index': 0,
 'previous_hash': '',
 'timestamp': 1707251644.011664,
 'transactions': [<__main__.Transaction object at 0x10f4fb0a0>,
                  <__main__.Transaction object at 0x10f4f8670>,
                  <__main__.Transaction object at 0x10f4fab90>,
                  <__main__.Transaction object at 0x10f4fb970>,
                  <__main__.Transaction object at 0x10f4fae30>,
                  <__main__.Transaction object at 0x10f4fa530>,
                  <__main__.Transaction object at 0x10f4face0>,
                  <__main__.Transaction object at 0x10f4fa9b0>,
                  <__main__.Transaction object at 0x10f4f80d0>,
                  <__main__.Transaction object at 0x10f4fae90>,
                  <__main__.Transaction object at 0x10f4f8550>,
                  <__main__.Transactio

### Lets make changes in the chain

In [11]:
error_block = blockchain.chain[-2]
blockchain.chain[3] = error_block

print(blockchain.is_valid())

(False, 3)
