In [1]:
import json

from base64 import urlsafe_b64decode, urlsafe_b64encode
from datetime import datetime
from hashlib import sha256
from math import ceil, log2
import msgpack

In [2]:
CONTEXTS = [
    "https://www.w3.org/2018/credentials/v1",
    "https://w3id.org/security/data-integrity/v2",
    { "@vocab": "https://www.w3.org/ns/credentials/issuer-dependent#" }
  ]
input = json.load(open("v2JSON/signedCredential.json", "r"))

In [3]:
def base64_encode(val: bytes) -> str:
    return urlsafe_b64encode(val).rstrip(b"=").decode("utf-8")

def base64_decode(val: str) -> bytes:
    padlen = 4 - len(val) % 4
    return urlsafe_b64decode(val if padlen > 2 else (val + "=" * padlen))

def encode_identifier(ident: str) -> str:
    return "did:key:" + ident.replace(" ", "%20")

def decode_identifier(ident: str) -> str:
    return ident.replace("%20", " ").lstrip("did:key:")

# V2 

In [4]:
def retrieve_cred_def_input_anoncreds(creds):
    return {"id" : encode_identifier(creds["issuer"]["id"]),
            "schema": encode_identifier(creds["issuer"]["schema"]["id"]),
            "cred_def": encode_identifier(creds["issuer"]["id"])} # currently setting cred_def as issuer id ~~~~

In [5]:
def encode_to_w3c_proof(creds):
    # need format check first, assume creds is already valid format
    anoncreds = creds["credential"]
    tmpSignature = {'signature': anoncreds['signature']}
    tmpRevocation = {'revocation_handle': anoncreds['revocation_handle'], 'revocation_index': anoncreds['revocation_index']}
    parts = {**tmpSignature, **tmpRevocation} # Assuming revocation info is also refuired ~~~~
    return 'u' + base64_encode(msgpack.dumps([1, parts]))
# encode_to_w3c_proof(input)

In [6]:
def decode_to_anoncreds(proof):
    return msgpack.loads(base64_decode(proof[1:]))
# tmp = encode_to_w3c_proof(input)
# decode_to_anoncreds(tmp)

In [7]:
def map_label_to_claim_value(data):
    schema_claims = data["issuer"]["schema"]["claims"]
    credential_claims = data["credential"]["claims"]
    
    attr = {}
    for idx, schema_claim in enumerate(schema_claims):
        label = schema_claim["label"] if schema_claim["claim_type"] != "Revocation" else "revocation_identifier"
        claim_value_dict = credential_claims[idx]
        claim_type = list(claim_value_dict.keys())[0]
        claim_value = claim_value_dict[claim_type]["value"]
        attr[label] = claim_value
    
    return attr
# map_label_to_claim_value(input)

## Encode

In [8]:
def to_w3c(cred_json:dict) -> dict:
    issuer = retrieve_cred_def_input_anoncreds(cred_json)
    signature = encode_to_w3c_proof(cred_json)
    attrs = map_label_to_claim_value(cred_json)

    return {
        "@context": CONTEXTS.copy(),
        "type": ["VerifiableCredential"],
        "issuer": issuer, # if this is what wanted? ~~~~
        "credentialSubject": attrs,
        "proof": {
            "type": "anoncreds-2024",
            "type": "DataIntegrityProof",
            "proofPurpose": "assertionMethod",
            "verificationMethod": "did:key:z6MkwXG2WjeQnNxSoynSGYU8V9j3QzP3JSqhdmkHc6SaVWoT/credential-definition",
            "proofValue": signature,
        },
        "issuanceDate": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    }
tmp = to_w3c(input)
tmp

{'@context': ['https://www.w3.org/2018/credentials/v1',
  'https://w3id.org/security/data-integrity/v2',
  {'@vocab': 'https://www.w3.org/ns/credentials/issuer-dependent#'}],
 'type': ['VerifiableCredential'],
 'issuer': {'id': 'did:key:d1bc208a7d0695e8cf2f3a3fe3e1f620',
  'schema': 'did:key:89c31714307f6522379ad99b473ee364',
  'cred_def': 'did:key:d1bc208a7d0695e8cf2f3a3fe3e1f620'},
 'credentialSubject': {'revocation_identifier': '91742856-6eda-45fb-a709-d22ebb5ec8a5',
  'name': 'John Doe',
  'address': 'P Sherman 42 Wallaby Way Sydney',
  'age': 30303},
 'proof': {'type': 'DataIntegrityProof',
  'proofPurpose': 'assertionMethod',
  'verificationMethod': 'did:key:z6MkwXG2WjeQnNxSoynSGYU8V9j3QzP3JSqhdmkHc6SaVWoT/credential-definition',
  'proofValue': 'ukgGDqXNpZ25hdHVyZYOnc2lnbWFfMdlgYjUyZDcyNGM4ZTdhYjJiYzExYzRmMzYzYmRkMTJkMjUzNTNlNjNhODA0Nzc5MWFmZjE2MTViYWU5ZGVmZmY5NzM4MmM0OGU4MGE4YjBhOGM4YmIyNjk1NmVhZGM4NmVkp3NpZ21hXzLZYGFiMmM0Yjk0ZmMzYjlkZDFmY2E5YWU3YmRmNjdmYzkxM2FhZDY5MTAwZTI1Njlk

## Decode

In [9]:
def to_anoncreds(cred_json):
    ret = {
        "issuer": {
        },
        "credential": {
            "claims": [],
        "signature": {},
        "revocation_handle": "",
        "revocation_index": 0
        }
    }
    issuer_id = decode_identifier(cred_json["issuer"]["id"])
    schema_id = decode_identifier(cred_json["issuer"]["schema"])
    attrs = cred_json["credentialSubject"]
    signature_parts = decode_to_anoncreds(cred_json["proof"]["proofValue"])[1]

    values = []
    for key, attr_value in attrs.items():
        if type(attr_value) == str:
            if key != 'revocation_identifier':
                values.append ({"Hashed" : {
                    "value": attr_value,
                    "print_friendly":True
                }})
            else :
                values.append ({"Hashed" : {
                    "value": attr_value
                }})
        elif type(attr_value) == int or float:
            values.append ({"Number": {
                "value": attr_value
            }})
    ret["credential"]["claims"] += values
    ret["credential"]["signature"] = signature_parts['signature']
    ret["credential"]["revocation_handle"] = signature_parts['revocation_handle']
    ret["credential"]["revocation_index"] = signature_parts['revocation_index']
    ret["issuer"]["id"] = issuer_id
    ret["issuer"]["schema"] = schema_id
    ret["issuer"]["cred_def"] = issuer_id

    return ret
to_anoncreds(tmp)

{'issuer': {'id': '1bc208a7d0695e8cf2f3a3fe3e1f620',
  'schema': '89c31714307f6522379ad99b473ee364',
  'cred_def': '1bc208a7d0695e8cf2f3a3fe3e1f620'},
 'credential': {'claims': [{'Hashed': {'value': '91742856-6eda-45fb-a709-d22ebb5ec8a5'}},
   {'Hashed': {'value': 'John Doe', 'print_friendly': True}},
   {'Hashed': {'value': 'P Sherman 42 Wallaby Way Sydney',
     'print_friendly': True}},
   {'Number': {'value': 30303}}],
  'signature': {'sigma_1': 'b52d724c8e7ab2bc11c4f363bdd12d25353e63a8047791aff1615bae9defff97382c48e80a8b0a8c8bb26956eadc86ed',
   'sigma_2': 'ab2c4b94fc3b9dd1fca9ae7bdf67fc913aad69100e2569dcd4c6e50c8d10abd1f58ce93ab746401620e09422831b1fee',
   'm_tick': '1056e25782ed14dbc564a19f36e33979931c0fa44ee89d3495b5a5a892911969'},
  'revocation_handle': 'abee3290988063eec35dd7963295fd36f2aa641c1c070e04f885f256208dcc99afd3e8a080291dd878464b32905a0190',
  'revocation_index': 0}}

# V1

In [10]:
# SIGNATURE_PARTS = ['w', 'x', 'y', 'y_blinds', 'revocation_verifying_key',
#                    'verifiable_encryption_key', 'revocation_registry',
#                    'value', 'sigma_1', 'sigma_2', 'm_tick',
#                    'revocation_handle', 'recovation_index']

In [11]:
# def process_list_entries(key, value):
#     entries = []
#     if isinstance(value, list):
#         for val in value:
#             raw_bytes = bytes.fromhex(val)
#             raw_len = len(raw_bytes)
#             entries.append(raw_len.to_bytes(2, "big") + raw_bytes)
#     elif key == "value":
#         # UUID string
#         raw_bytes = value.encode()
#         raw_len = len(raw_bytes)
#         entries.append(raw_len.to_bytes(2, "big") + raw_bytes)
#     elif key == "recovation_index":
#         raw = value
#         raw_len = ceil(log2(raw + 1) / 8)
#         raw_bytes = raw.to_bytes(raw_len, "big")
#         entries.append(raw_len.to_bytes(2, "big") + raw_bytes)
#     else:
#         raw_bytes = bytes.fromhex(value)
#         raw_len = len(raw_bytes)
#         entries.append(raw_len.to_bytes(2, "big") + raw_bytes)
#     return entries

# def encode_to_w3c_signature(cred_json: dict) -> str:
#     parts = cred_json['issuer']['verifying_key'].copy()
#     parts.update({"revocation_verifying_key":cred_json['issuer']['revocation_verifying_key']})
#     parts.update({"verifiable_encryption_key": cred_json['issuer']['verifiable_encryption_key']})
#     parts.update({"revocation_registry":cred_json['issuer']['revocation_registry']})
#     try:
#         parts.update({"value":cred_json['credential']['claims'][0]['Revocation']['value']})
#     except:
#         pass
#     parts.update(cred_json['credential']['signature'])
#     parts.update({"revocation_handle":cred_json['credential']['revocation_handle']})
#     parts.update({"revocation_index":cred_json['credential']['revocation_index']})

#     entries = []
#     for idx, key in enumerate(SIGNATURE_PARTS):
#         if key not in parts:
#             continue
#         entry = process_list_entries(key, parts[key])
#         for e in entry:
#             entries.append(bytes((idx,)) + e)
    
#     return base64_encode(b"".join(entries))


In [12]:
# def decode_from_w3c_signature(signature: str) -> dict:

#     sig_bytes = base64_decode(signature)
#     ret = {
#         "issuer": {
#             "verifying_key": {
#                 "w": "",
#                 "x": "",
#                 "y": [],
#                 "y_blinds": []
#             },
#             "revocation_verifying_key": "",
#             "verifiable_encryption_key": "",
#             "revocation_registry": ""
#         },
#         "credential": {
#             "claims": [
#                 {
#                     "Revocation": {
#                         "value": ""
#                     }
#                 }
#             ],
#             "signature": {
#                 "sigma_1": "",
#                 "sigma_2": "",
#                 "m_tick": ""
#             },
#             "revocation_handle": "",
#             "revocation_index": 0
#         }
#     }

#     while sig_bytes:
#         if len(sig_bytes) < 3:
#             raise Exception("Invalid signature: too short for index and length")
#         idx = int(sig_bytes[0])
#         if idx >= len(SIGNATURE_PARTS):
#             raise Exception("Invalid signature: index out of range")
#         key = SIGNATURE_PARTS[idx]  # Correctly using idx to reference the key from SIGNATURE_PARTS
#         raw_len = int.from_bytes(sig_bytes[1:3], "big")
#         sig_bytes = sig_bytes[3:]
#         if len(sig_bytes) < raw_len:
#             raise Exception("Invalid signature: length mismatch for data")

#         raw = sig_bytes[:raw_len]
#         sig_bytes = sig_bytes[raw_len:]

#         key = SIGNATURE_PARTS[idx]
#         if key in ['w', 'x', 'sigma_1', 'sigma_2', 'm_tick', 'revocation_handle']:
#             path = key.split('_')
#             if len(path) == 1:
#                 ret['issuer']['verifying_key'][key] = raw.hex()
#             else:
#                 ret['credential']['signature'][key] = raw.hex()
#         elif key == 'y' or key == 'y_blinds':
#             ret['issuer']['verifying_key'][key].append(raw.hex())
#         elif key == 'value':
#             ret['credential']['claims'][0]['Revocation']['value'] = raw.decode('utf-8')
#         elif key == 'revocation_index':
#             ret['credential']['revocation_index'] = int.from_bytes(raw, "big")
#         else:  # For 'revocation_verifying_key', 'verifiable_encryption_key', 'revocation_registry'
#             ret['issuer'][key] = raw.hex()

#     return ret


In [13]:
# def map_label_to_claim_value(data):
#     schema_claims = data["issuer"]["schema"]["claims"]
#     credential_claims = data["credential"]["claims"]
    
#     attr = {}
#     for idx, schema_claim in enumerate(schema_claims):
#         if schema_claim["print_friendly"]:
#             label = schema_claim["label"]
#             claim_value_dict = credential_claims[idx]
#             claim_type = list(claim_value_dict.keys())[0]
#             claim_value = claim_value_dict[claim_type]["value"]
#             attr[label] = claim_value
    
#     return attr

In [14]:
# def to_w3c(cred_json:dict) -> dict:
#     issuer = "did:sov:" + cred_json["issuer"]["id"]
#     cred_def_id = encode_identifier(cred_json["issuer"]["id"])
#     schema_id = encode_identifier(cred_json["issuer"]["schema"]["id"])
#     signature = encode_to_w3c_signature(cred_json)
#     attrs = map_label_to_claim_value(cred_json)

#     return {
#         "@context": CONTEXTS.copy(),
#         "type": ["VerifiableCredential", "AnonCredsCredential"],
#         "issuer": issuer,
#         "issuanceDate": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
#         "credentialSchema": {
#             "type": "AnonCredsDefinition",
#             "id": cred_def_id,
#             "schema": schema_id,
#         },
#         "credentialSubject": attrs,
#         "proof": {
#             "type": "PSSignature2022",
#             "encoding": "auto",
#             "proofValue": signature,
#         }
#     }

In [15]:
# def to_anon_creds(cred_json:dict) -> dict:
#     issuer_id = decode_identifier(cred_json["credentialSchema"]["schema"])
#     cred_def_id = decode_identifier(cred_json["credentialSchema"]["id"])
#     attrs = cred_json["credentialSubject"]
#     signature_parts = decode_from_w3c_signature(cred_json["proof"]["proofValue"])

#     values = []
#     for _, attr_value in attrs.items():
#         if type(attr_value) == str:
#             values.append ({"Hashed" : {
#                 "value": attr_value,
#                 "print_friendly":True
#             }})
#         elif type(attr_value) == int or float:
#             values.append ({"Number": {
#                 "value": attr_value
#             }})
#     signature_parts["credential"]["claims"] = signature_parts["credential"]["claims"] + (values)

#     tmp = {"issuer": {**{"id": issuer_id, "schema": {"id": cred_def_id}}, **signature_parts["issuer"]}}
#     tmp["credentials"] = signature_parts["credential"]
#     return tmp

In [16]:
# def encode_to_w3c_signature(cred_json: dict) -> str:
    # parts: dict = cred_json["credential"]["signature"].copy()

    # entries = []
    # for idx, key in enumerate(SIGNATURE_PARTS):
    #     if key not in parts:
    #         continue
    #     raw_bytes = bytes.fromhex(parts[key])
    #     raw_len = len(raw_bytes)
    #     entries.append(bytes((idx,)) + raw_len.to_bytes(2, "big") + raw_bytes)

    # return base64_encode(b"".join(entries))

# def decode_from_w3c_signature(signature: str) -> dict:
#     sig_bytes = base64_decode(signature)
#     ret = {"signature": {}}

#     while sig_bytes:
#         if len(sig_bytes) < 3:
#             raise Exception("invalid signature")
#         idx = int(sig_bytes[0])
#         if idx >= len(SIGNATURE_PARTS):
#             raise Exception("invalid signature")
#         raw_len = int.from_bytes(sig_bytes[1:3], "big")
#         sig_bytes = sig_bytes[3:]
#         if len(sig_bytes) < raw_len:
#             raise Exception("invalid signature")
#         raw = sig_bytes[:raw_len].hex()
#         sig_bytes = sig_bytes[raw_len:]

#         key = SIGNATURE_PARTS[idx]
#         ret["signature"][key] = raw

#     return ret