In [None]:
import typing
import bittensor as bt
import pydantic

class Store(bt.Synapse):
    class Config:
        validate_assignment = True

    def deserialize(self) -> str:
        return self.zk_proof

    # Query values
    content: typing.Optional[object] = pydantic.Field(..., allow_mutation=False)
    content_hash: str = pydantic.Field(..., allow_mutation=False)
    pubkey: str = pydantic.Field(..., allow_mutation=False)
    signature: str = pydantic.Field(..., allow_mutation=False)

    # Return values
    verified: typing.Optional[str] = ''

class Retrieve(bt.Synapse):
    class Config:
        validate_assignment = True

    def deserialize(self) -> str:
        return self.verified

    # Query values
    content_hash: str = pydantic.Field(..., allow_mutation=False)

    # Return values
    in_registry: typing.Optional[bool] = False
    miner_signature: typing.Optional[str] = ''
    miner_pubkey: typing.Optional[str] = ''

In [None]:
import hashlib
import secrets
import pgpy
from pgpy.constants import PubKeyAlgorithm, KeyFlags, HashAlgorithm, SymmetricKeyAlgorithm, CompressionAlgorithm

def hash256(content: str) -> str:
    return hashlib.sha256(content.encode()).hexdigest()

def generate_password(seed, length=12):
    # Convert the seed to string
    seed_str = str(seed)
    
    # Hash the string representation using SHA-256
    hashed = hashlib.sha256(seed_str.encode()).hexdigest()
    
    # Use the hash to generate a password
    alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()-_=+"
    password = ''.join(secrets.choice(alphabet) for i in range(length))
    
    return password

def sign_content_with_new_keypair(content: str, passphrase: str = None, seed: int = 1337) -> (str, str):
    """
    Sign the content using the private key.

    Args:
    - content (str): The content to sign.
    - passphrase (str): The passphrase for the private key.

    Returns:
    - tuple: The cleartext signature and the public key.
    """
    # Generate a primary key
    key = pgpy.PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 4096)

    # Add a user ID to the key
    uid = pgpy.PGPUID.new('Test User', email='test@example.com')
    key.add_uid(uid, usage={KeyFlags.Sign, KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage},
                hashes=[HashAlgorithm.SHA256, HashAlgorithm.SHA512],
                ciphers=[SymmetricKeyAlgorithm.AES256, SymmetricKeyAlgorithm.AES128],
                compression=[CompressionAlgorithm.ZLIB, CompressionAlgorithm.ZIP, CompressionAlgorithm.Uncompressed])

    # TODO: use a real passphrase and get it from the env variables or terminal input
    # For not just generate a throw-away passphrase
    if passphrase is None:
        passphrase = generate_password(seed)

    # Protect the key with the passphrase
    key.protect(passphrase, SymmetricKeyAlgorithm.AES256, HashAlgorithm.SHA256)

    # Unlock the key to sign data
    with key.unlock(passphrase):
        # Create a cleartext signature
        cleartext_signature = pgpy.PGPMessage.new(content, cleartext=True)
        cleartext_signature |= key.sign(cleartext_signature, hash=HashAlgorithm.SHA256)

    return str(cleartext_signature), str(key.pubkey)

def verify_pgpy_with_content(content: str, signature: str, pubkey: str) -> bool:
    """
    Verify the cleartext signature of the content using the provided public key.

    Args:
    - signature (str): The cleartext signature string.
    - content (str): The content string.
    - pubkey (str): The public key string.

    Returns:
    - bool: True if the signature is valid, False otherwise.
    """
    # Load the public key
    public_key, _ = pgpy.PGPKey.from_blob(pubkey)

    # Load the signature
    signature_obj = pgpy.PGPSignature.from_blob(signature)

    # Convert the content to bytes if it's not
    if not isinstance(content, bytes):
        content = content.encode('utf-8')

    # Verify the signature using the content and public key
    return public_key.verify(content, signature_obj)

sign = sign_content_with_new_keypair
verify = verify_pgpy_with_content
hash = hash256


In [None]:
registry = {}

def store( synapse: Store ) -> Store:
    # Check content_hash against the content
    local_content_hash = hash( synapse.content )
    # If it matches, check the signature against the pubkey
    if synapse.content_hash == local_content_hash:
        if verify( synapse.content, synapse.signature, synapse.pubkey ):
            # # If it matches, generate a signature of the content signed with the miner key
            # store the content has as key and the (miner_signature, pubkey) pairs in the database
            miner_signature, miner_pubkey = sign_content( synapse.content_hash )
            registry[ synapse.content_hash ] = ( miner_signature, miner_pubkey )
            stored = True
            # Optimistically store (no need to send back the signature until verify step)
        else:
            # If it doesn't match, return an error. Attempted to store invalid content.
            verified = False
            raise ValueError("Signature is not valid with provided pubkey!")
    else:
        # If it doesn't match, return an error.
        verified = False
        raise ValueError("Content hash not found in registry!")
    # return the filled synapse
    synapse.stored = stored
    return synapse

In [None]:
content = "This is some content"
content_hash = hash( content )
signature, pubkey = sign_content_with_new_keypair( content_hash )

In [None]:
syn = Store(content=content, content_hash=content_hash, pubkey=pubkey, signature=signature)
syn

In [None]:
filled_syn = store( syn )