In [35]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [44]:
import requests
import time
import uuid

In [45]:
from package.keypair_management import KeyPairManagement
from package.jwt_management import ClientJWTManagement, ServerJWTManagement, JWTValidation
from package.utils import config

In [46]:
BASE_URL = "http://localhost:8000"

In [47]:
km = KeyPairManagement(directory='./client_keypair')
km.generate_keypairs()

Directory './client_keypair' already exists.


In [48]:
requests.get(f"{BASE_URL}/health").json()

{'response': 'Alive!'}

In [50]:
client = ClientJWTManagement()
headers = client.create_headers(public_key=km.load_public_key_from_pem())
payload = client.create_payload(
    jti=str(uuid.uuid4()),
    iat=int(time.time()),
    exp=config.DPOP_TOKEN_TIME,
    method="GET", uri=f"{BASE_URL}/authorizer/token", data={"client_id": "555"}
)

In [51]:
headers

{'typ': 'dpop+jwt',
 'alg': 'RS256',
 'jwk': {'kty': 'RSA',
  'n': 'uaxExR2qAEYFchl6Cmcb5poTEkcPzIg4mCeanc38Vzxj0w3QE3ApL2GBFdxdz85jLAtDLk03avUYnkJAjjkitjCTrlWXu6vpNkP9G1gA4qAMM6hgMpAJ1akXyelhT8XyBYQ6ueB8BB1ODVH9ZoypdoCVGrLw22V2iGSZrDuZ1PsnAZDNHVvdshTPiWT77rj73_NkHAE2pgQdpYD67RwUB6CeLRoT8TC1GBPtZnh5NOijE8AE9ix65MmxPxeKuJrG5L6cazODJJmXuVLhXOMKDMvL78LH5rDjUhqZS7GEW3TImUaD1-730wbs_Nplk5fOmotJI2amrGXqRm-jLm-RSw',
  'e': 'AQAB',
  'kid': 'n_XGHMj-5iitpNgHJBOcJArt86pnYw9FU-pSMzwFXMo'}}

In [52]:
payload

{'htm': 'GET',
 'htu': 'http://localhost:8000/authorizer/token',
 'iat': 1732544684,
 'exp': 30,
 'jti': 'b5e52009-7c15-4c08-b50e-5984f2eea1c6',
 'client_id': '555'}

# Scenarios

In [137]:
client = ClientJWTManagement()
# server = ServerJWTManagement()
def sing_signature(jti:str, iat:int, exp:int, method:str, url:str, data:dict, public_key:bytes, private_key:bytes):
    headers = client.create_headers(public_key=public_key)
    payload = client.create_payload(
        jti=jti,
        iat=iat,
        exp=iat+exp,
        method=method, uri=url, data=data
    )
    client_signature = client.sign_by_client(headers=headers, payload=payload, private_key=private_key)
    return client_signature

def pack_headers(client_signature, access_token=None):
    headers = {
        "DPoP": client_signature
    }
    if access_token is not None:
        headers.update({"Authorization": f"DPoP {access_token}"})
    return headers

def test_get_endpoint(url, client_signature, access_token=None):
    headers = pack_headers(client_signature, access_token)
    response = requests.get(url, headers=headers)
    print("status code: ", response.status_code)
    response = response.json()
    return response

def test_post_endpoint(url, payload, client_signature, access_token=None):
    headers = pack_headers(client_signature, access_token)
    response = requests.post(url, headers=headers, json=payload)
    response = response.json()
    return response

## Happy Path

In [140]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)
response = test_get_endpoint(url=f"{BASE_URL}/authorizer/token", client_signature=signature, access_token=None)
access_token = response['access_token']
refresh_token = response['refresh_token']
response

status code:  200


{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NTY2MTQsImV4cCI6MTczMjU1NjkxNCwianRpIjoiOTUzODhlNTUtNjZiZC00MWQ1LTg0N2MtYzY2YzYwOGMxYzhlIiwiY2xpZW50X2lkIjoiNTU1NSIsImNuZiI6eyJqa3QiOiJuX1hHSE1qLTVpaXRwTmdISkJPY0pBcnQ4NnBuWXc5RlUtcFNNendGWE1vIn19.Ek40VDhSPOZc9mRUlcj7Ai9Jg8sx1y1AJcEPXO0dNThoCEfILznzZSIcqPSMzvdzrljAmnZYh_Zs1Am0yVu-_ludCTjzeXah5vwmeeKYiF5h7mwltKYlKIsFTY4adrLX-S7E-ugFGjU2kpZ8YU7ifubOsUnaJA0syR0YtUUyXWeNKiPEiR2-KzHaeSABOC-5qtd34_lNBVfLJCqwZXVS8ulSZHNeCfTlmJfOC555yGWn39FgNoYQPYAgKCT8os-smOqke7I615Oxq90xaZ_w_Fclog7bpGNpdX6YuF_7Oy0QmFcy2kNl-35ChYETrN_9W9NySySUkm6ssBhnGEigLQ',
 'token_type': 'DPoP',
 'exp': 1732556914,
 'refresh_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NTY2MTQsImV4cCI6MTczMjY0MzAxNCwianRpIjoiZWFmYWI2OWEtZDZlOC00MDEyLWE5M2UtZTA0YmJmMjFlOGQ3IiwiY2xpZW50X2lkIjoiNTU1NSJ9.f1t3N0jYvib_JFsQIV7fW0bqOfeVX-ymHwWEx9M4XPJU1mR6i-wSC_FGopGgZveV_stHbgqQxivsc9xzouJSUwNLymrKANFhPvXzfn3xef_co1PXXHFxx57Vlo-eDT7aAf_llwRZnxnDu88PRR1OyaQJ_K2

In [141]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="GET", url=f"{BASE_URL}/resource/history", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)
response = test_get_endpoint(url=f"{BASE_URL}/resource/history", client_signature=signature, access_token=access_token)
response

status code:  200


{'response': 'you now get history'}

## DPoP expire

In [68]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=1, 
    method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)

time.sleep(2)

response = test_get_endpoint(url=f"{BASE_URL}/authorizer/token", client_signature=signature, access_token=None)
response

{'detail': 'Error: The JWT has expired. Signature has expired.'}

## DPoP tampered

In [14]:
signature = sing_signature(dpop_life_time=60*5, method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "5555"})

In [15]:
tampered_signature = sing_signature(dpop_life_time=60*5, method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "6666"})

In [16]:
headers, payload, signature = signature.split(".")
_, tampered_payload, _ = tampered_signature.split(".")

In [17]:
response = test_get_endpoint(url=f"{BASE_URL}/authorizer/token", client_signature=".".join([headers, tampered_payload, signature]), access_token=None)
response

{'detail': 'Error: General JWT verification issue. Signature verification failed.'}

## Invalid claims: method and url

In [77]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)
response = test_get_endpoint(url=f"{BASE_URL}/authorizer/token", client_signature=signature, access_token=None)
access_token = response['access_token']
refresh_token = response['refresh_token']
response

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NDU0NDQsImV4cCI6MTczMjU0NTc0NCwianRpIjoiMmIxYjYyYjEtNDgwMS00Y2M2LTk5N2MtOWVlZTEwOTMwMDYzIiwiY2xpZW50X2lkIjoiNTU1NSIsImNuZiI6eyJqa3QiOiJuX1hHSE1qLTVpaXRwTmdISkJPY0pBcnQ4NnBuWXc5RlUtcFNNendGWE1vIn19.UG5MljDEjTKLBM0exXPoWHsj3neqPiwRj_UnUFP8ayAuO7kVflC2srIahAFpM9QKPo53o6uMcDjHIRcTCN8GB3T5GHlsxMipgpz3WlLUSIWzb4oyIFM61XydGWO2HyHH3XwA3ywD7nFojvW-_p0V0hr0xodboiG2o4ts_Fmlg8shituW3G_i29FjRQ3KOtE1BCafH_l9SniRV5UfdZf90qqXdw5byoQPU2e14cJlL8Jrg6vDwZH2C6o8YR0U0gjTcBbhvkgyZHrXztd6_i9iUozprqoa8fusaWgGMosotS83jlAgERpxz-lR61fz19sHScGaFOj0cdECG1zzVk_qtg',
 'token_type': 'DPoP',
 'exp': 1732545744,
 'refresh_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NDU0NDQsImV4cCI6MTczMjYzMTg0NCwianRpIjoiZjZjNmY5YWUtNzQxOS00ZjhmLTg3OWItNDRlNDZmNzNhNTczIiwiY2xpZW50X2lkIjoiNTU1NSJ9.ZdxcVLWwCXCi6MpvToJpQ2cDGBJF0MF_xxnvibgCmkLWIB4UMIDpwa6F3Fm5ri5Sv1AunMiDh5pUueq0lN8HwO3NwK0l_EsppTErorYytxkdDPLe6EyHO6ABTgB4d7Yg8lcovL72-KNrxbjNaLGBCtTRMuO

In [78]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="POST", url=f"{BASE_URL}/resource/history1", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)
response = test_get_endpoint(url=f"{BASE_URL}/resource/history", client_signature=signature, access_token=access_token)
response

{'detail': 'Invalid claim'}

## Invalid claims: method

In [20]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NDE4NTgsImV4cCI6MTczMjU0MjQ1OCwianRpIjoiOWRiMmY1MzItYWMzOS00NTI0LWFlMDItOWU5ZjljZmZhMTk3IiwiY2xpZW50X2lkIjoiNTU1NSIsImNuZiI6eyJqa3QiOiItYTNocmZGVWpEekZab0gxN3BQSnNNUFcxRG5RdmM0YW9kZGNTQUo5TThjIn19.EuZ8rNZLQeKsF6_PnOcl8PKy9Yp0b1PqRjWjuNKuJ49C5PR08aEJA2fxBDG8JzwbeitoEHShykOH6OHw-BUa_6YLf-ChG_PSYmOvyZWdmeVrS28vj21rNTBZyKZ2EibqHleGGcvRQPvShhgEvo1biv6Jv0S3FSPzQx45AQiCwldXoJPU8yaJ1qXXnvAIEs1D03Xol3vZ9TWKgJRcwAUCXFb7R-iU2CxWhtHmS-JbQ44RHiW8eH3SzT-JPzQ0BTzQM3HKqQaX0Rck4rehxz3IsByxLLishYOEntgpNlqa1ZNKHZ1hzmGj921xM2xJjtS5_3zYHBfWlcNG7Mvax1xv-A',
 'token_type': 'DPoP',
 'exp': 1732542458,
 'refresh_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NDE4NTgsImV4cCI6MTczMjYyODI1OCwianRpIjoiZDc0NjQxMzUtMWEyYi00ZWIxLTllODQtMDQ5OTg2Njc5ZmUwIiwiY2xpZW50X2lkIjoiNTU1NSJ9.zXfJxsEuSDmidYW9jUMJh4jzqckVf-vHtJODk-x_qJAsTBlryhqik1kJ8fNV_OO8DS00yw_bk3XW1Cmr9DD-lj8QEXI407jm6AV9eX9Guh-w28N3vRQyjw96WGSXVtIKMo5j-LxYYSZ7-v-c_UNPmmcTs2u

In [79]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="POST", url=f"{BASE_URL}/resource/history", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)
response = test_get_endpoint(url=f"{BASE_URL}/resource/history", client_signature=signature, access_token=access_token)
response

{'detail': 'Invalid claim: method mismatched.'}

## Invalid claims: url

In [22]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="GET", url=f"{BASE_URL}/authorizer/token", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NDE4NjUsImV4cCI6MTczMjU0MjQ2NSwianRpIjoiODgwYTM5MmItN2UwMC00MDcyLTlkNDktMzA2MzA5ZGEyZWI3IiwiY2xpZW50X2lkIjoiNTU1NSIsImNuZiI6eyJqa3QiOiItYTNocmZGVWpEekZab0gxN3BQSnNNUFcxRG5RdmM0YW9kZGNTQUo5TThjIn19.15ue4tLaxYpcz4oYK4g_e7icCBJzEHLdOS05nRiiIxzNTYLF5QXo5lKAxNmTfXeCdtSbfHQwYDRlPBo0qzk_7mZJoJsu7D43BdBb3GZjlyL-qI1yeMLydkf9i8BHsw8Z7qckUkudsZ0PrtDjJY4bcJwha7LljXaxLyY2u1kuBNnnoOhVD6hfVqC9cYHEKd3e6-Rk7l_PEp7uAniMWUJUiXXzw_WX22RbEV0SVoWcwq68I1g4OKpeJoM5EirATgJrWfpa6kGu5ElJs6x9juyGqYlsoRR92kylcBiCBiTBFIQa0J2Ukvh1XbRSLRdsQqzm-ds9n3wPAPysOGltxA4vzQ',
 'token_type': 'DPoP',
 'exp': 1732542465,
 'refresh_token': 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3MzI1NDE4NjUsImV4cCI6MTczMjYyODI2NSwianRpIjoiYTk3N2E2Y2EtZDdiZS00OGFiLTkyZjctNzVmYTUwZGYwYWJhIiwiY2xpZW50X2lkIjoiNTU1NSJ9.O47C4eGpJKDF89CSUeCmJg8PKdq5Lx1ENUAtLfj_7bCnEqlKwgFumjbPCWxdiYEwOoe8sr0MdFPEyQbJU-nlM61y60Z4Ruzrlr2dQKPhWkOag07I4uiN_H5852CUyZZmqCndTlRWNZyp9-VF7FZLZgnL9FJ

In [80]:
signature = sing_signature(
    jti=str(uuid.uuid4()), iat=int(time.time()), exp=config.DPOP_TOKEN_TIME, 
    method="GET", url=f"{BASE_URL}/resource/history1", data={"client_id": "5555"},
    public_key=km.load_public_key_from_pem(),
    private_key=km.load_private_key_from_pem(),
)
response = test_get_endpoint(url=f"{BASE_URL}/resource/history", client_signature=signature, access_token=access_token)
response

{'detail': 'Invalid claim: url mismatched.'}

## Invalid jti: dpop

## Invalid jti: access_token

## Invalid jti: refresh_token