# Sunbird RC - Attestation & Approval Workflow Demo

This notebook demonstrates the **complete attestation/approval workflow** in Sunbird RC, including:

- üîê **Authentication** with Keycloak
- üìã **Attestation Policies** - Define approval workflows
- üìù **Claims Management** - Submit and review approval requests
- ‚úÖ **Approval Process** - Grant or reject claims
- üéì **Credentials** - Generate verifiable credentials
- üîÑ **Multi-level Approvals** - Chain multiple attestors

## Prerequisites
- Sunbird RC services running (`./start-sunbird.sh`)
- Keycloak at http://localhost:8080
- Registry API at http://localhost:8081
- Claims Service at http://localhost:8082

## Setup & Configuration

In [5]:
import requests
import json
import pandas as pd
from datetime import datetime
from IPython.display import display, Markdown, JSON
import time

# Configuration
KEYCLOAK_URL = "http://localhost:8080"
REGISTRY_URL = "http://localhost:8081"
CLAIMS_URL = "http://localhost:8082"
REALM = "sunbird-rc"

# Admin credentials (as provided)
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin123"

print("‚úÖ Setup complete!")
print(f"Keycloak: {KEYCLOAK_URL}")
print(f"Registry: {REGISTRY_URL}")
print(f"Claims Service: {CLAIMS_URL}")

‚úÖ Setup complete!
Keycloak: http://localhost:8080
Registry: http://localhost:8081
Claims Service: http://localhost:8082


## 1. Authentication with Keycloak

Sunbird RC uses **Keycloak** for authentication and authorization.
We need to obtain a JWT token to make authenticated API calls.

In [6]:
def get_admin_token():
    """
    Authenticate with Keycloak and get an access token.
    Uses the admin-api client for backend operations.
    """
    token_url = f"{KEYCLOAK_URL}/auth/realms/{REALM}/protocol/openid-connect/token"
    
    payload = {
        "client_id": "admin-api",
        "grant_type": "password",
        "username": ADMIN_USERNAME,
        "password": ADMIN_PASSWORD
    }
    
    try:
        response = requests.post(token_url, data=payload)
        response.raise_for_status()
        token_data = response.json()
        return token_data
    except Exception as e:
        print(f"‚ùå Authentication failed: {e}")
        return None

# Get authentication token
auth_response = get_admin_token()

if auth_response:
    ACCESS_TOKEN = auth_response['access_token']
    print("‚úÖ Authentication successful!")
    print(f"Token Type: {auth_response['token_type']}")
    print(f"Expires In: {auth_response['expires_in']} seconds")
    print(f"\nAccess Token (first 50 chars): {ACCESS_TOKEN[:50]}...")
    
    # Headers for authenticated requests
    AUTH_HEADERS = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {ACCESS_TOKEN}"
    }
else:
    print("‚ùå Could not obtain token. Check if Keycloak is running and credentials are correct.")

‚ùå Authentication failed: 401 Client Error: Unauthorized for url: http://localhost:8080/auth/realms/sunbird-rc/protocol/openid-connect/token
‚ùå Could not obtain token. Check if Keycloak is running and credentials are correct.


### Decode JWT Token (Optional)

Let's inspect the JWT token to see user roles and claims:

In [3]:
import base64

def decode_jwt(token):
    """Decode JWT token (without verification)"""
    parts = token.split('.')
    if len(parts) != 3:
        return None
    
    # Add padding if needed
    payload = parts[1]
    padding = len(payload) % 4
    if padding:
        payload += '=' * (4 - padding)
    
    decoded = base64.urlsafe_b64decode(payload)
    return json.loads(decoded)

if ACCESS_TOKEN:
    token_claims = decode_jwt(ACCESS_TOKEN)
    print("üîç JWT Token Claims:\n")
    display(JSON({
        "username": token_claims.get('preferred_username'),
        "email": token_claims.get('email'),
        "roles": token_claims.get('realm_access', {}).get('roles', []),
        "issued_at": datetime.fromtimestamp(token_claims.get('iat', 0)).isoformat(),
        "expires_at": datetime.fromtimestamp(token_claims.get('exp', 0)).isoformat()
    }, expanded=True))

NameError: name 'ACCESS_TOKEN' is not defined

## 2. Create Entities for Attestation Workflow

Let's create entities that will participate in the approval workflow:
- **Student** - Will request degree attestation
- **University** - Will attest/approve the degree

In [None]:
# First, check if Student entity schema exists
response = requests.get(f"{REGISTRY_URL}/api/v1/Student", headers=AUTH_HEADERS)
print(f"Student schema check: {response.status_code}")

# Create a Student
student_data = {
    "name": "Alice Johnson",
    "studentId": "STU2025001",
    "email": "alice.johnson@example.com",
    "phoneNumber": "+1-555-0101"
}

try:
    response = requests.post(
        f"{REGISTRY_URL}/api/v1/Student",
        headers=AUTH_HEADERS,
        json=student_data
    )
    
    if response.status_code == 200:
        student_result = response.json()
        STUDENT_OSID = student_result['result']['Student']['osid']
        print(f"‚úÖ Student created successfully!")
        print(f"Student OSID: {STUDENT_OSID}")
        display(JSON(student_result, expanded=True))
    else:
        print(f"‚ùå Error creating student: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"‚ùå Exception: {e}")

In [None]:
# Create a University (Attestor)
university_data = {
    "name": "MIT",
    "code": "MIT001",
    "contactEmail": "registrar@mit.edu"
}

try:
    response = requests.post(
        f"{REGISTRY_URL}/api/v1/Institute",  # Using Institute schema from earlier
        headers=AUTH_HEADERS,
        json=university_data
    )
    
    if response.status_code == 200:
        university_result = response.json()
        UNIVERSITY_OSID = university_result['result']['Institute']['osid']
        print(f"‚úÖ University created successfully!")
        print(f"University OSID: {UNIVERSITY_OSID}")
        display(JSON(university_result, expanded=True))
    else:
        print(f"‚ùå Error creating university: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"‚ùå Exception: {e}")

## 3. Create an Attestation Policy

An **Attestation Policy** defines:
- **What** needs approval (which properties)
- **Who** can approve (attestor entity)
- **How** approval happens (manual/automated)
- **What happens next** (credential generation, next approval step)

In [None]:
# Create an attestation policy for degree certificates
attestation_policy = {
    "name": "degreeAttestation",
    "attestationProperties": {
        "degree": "$",
        "major": "$",
        "graduationDate": "$",
        "gpa": "$"
    },
    "type": "MANUAL",
    "attestorEntity": "Institute",
    "conditions": "{name}",  # Attestor is identified by institute name
    "attestorPlugin": "did:internal:ClaimPluginActor",
    "credentialTemplate": {
        "@context": [
            "https://www.w3.org/2018/credentials/v1",
            "https://www.w3.org/2018/credentials/examples/v1"
        ],
        "type": ["VerifiableCredential", "DegreeCertificate"],
        "issuer": "{{attestorEntity}}",
        "credentialSubject": {
            "degree": "{{degree}}",
            "major": "{{major}}",
            "graduationDate": "{{graduationDate}}",
            "gpa": "{{gpa}}"
        }
    }
}

try:
    response = requests.post(
        f"{REGISTRY_URL}/api/v1/DegreeCertificate/attestationPolicy",
        headers=AUTH_HEADERS,
        json=attestation_policy
    )
    
    if response.status_code == 200:
        policy_result = response.json()
        POLICY_ID = policy_result.get('id') or policy_result.get('result', {}).get('id')
        print("‚úÖ Attestation Policy created successfully!")
        print(f"Policy ID: {POLICY_ID}")
        display(JSON(policy_result, expanded=True))
    else:
        print(f"‚ùå Error creating policy: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"‚ùå Exception: {e}")

### List All Attestation Policies

In [None]:
try:
    response = requests.get(
        f"{REGISTRY_URL}/api/v1/DegreeCertificate/attestationPolicies",
        headers=AUTH_HEADERS
    )
    
    if response.status_code == 200:
        policies = response.json()
        print(f"üìã Found {len(policies)} attestation policies:\n")
        
        for policy in policies:
            print(f"- {policy.get('name')} (Type: {policy.get('type')}, Status: {policy.get('status', 'DRAFT')})")
        
        display(JSON(policies, expanded=True))
    else:
        print(f"Status: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"‚ùå Exception: {e}")

### Publish the Policy

Policies are created in `DRAFT` status. We need to publish them to make them active:

In [None]:
if 'POLICY_ID' in locals():
    try:
        response = requests.put(
            f"{REGISTRY_URL}/api/v1/DegreeCertificate/attestationPolicy/{POLICY_ID}/PUBLISHED",
            headers=AUTH_HEADERS
        )
        
        if response.status_code == 200:
            print("‚úÖ Policy published successfully!")
            print("The policy is now active and will trigger for new degree certificates.")
        else:
            print(f"Status: {response.status_code}")
            print(response.text)
    except Exception as e:
        print(f"‚ùå Exception: {e}")
else:
    print("‚ö†Ô∏è  No policy ID available. Create a policy first.")

## 4. Create Degree Certificate (Triggers Attestation)

When we create a degree certificate, the system will:
1. Detect that properties match an attestation policy
2. Set state to `ATTESTATION_REQUESTED`
3. Automatically raise a **claim** to the attestor

In [None]:
# Create a degree certificate that requires attestation
degree_certificate = {
    "studentName": "Alice Johnson",
    "studentId": "STU2025001",
    "degree": "Master of Science",
    "major": "Computer Science",
    "university": "MIT",
    "graduationDate": "2025-05-15",
    "gpa": 3.95,
    "certificateNumber": "MIT-CS-2025-001"
}

try:
    response = requests.post(
        f"{REGISTRY_URL}/api/v1/DegreeCertificate",
        headers=AUTH_HEADERS,
        json=degree_certificate
    )
    
    if response.status_code == 200:
        cert_result = response.json()
        CERTIFICATE_OSID = cert_result['result']['DegreeCertificate']['osid']
        print("‚úÖ Degree Certificate created!")
        print(f"Certificate OSID: {CERTIFICATE_OSID}")
        print("\nüîî This should trigger attestation workflow...")
        display(JSON(cert_result, expanded=True))
    else:
        print(f"‚ùå Error: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"‚ùå Exception: {e}")

### Check Certificate State

In [None]:
if 'CERTIFICATE_OSID' in locals():
    try:
        response = requests.get(
            f"{REGISTRY_URL}/api/v1/DegreeCertificate/{CERTIFICATE_OSID}",
            headers=AUTH_HEADERS
        )
        
        if response.status_code == 200:
            cert_data = response.json()
            state = cert_data.get('_osState', 'UNKNOWN')
            print(f"üìä Certificate State: {state}")
            
            if state == 'ATTESTATION_REQUESTED':
                print("‚úÖ Attestation workflow triggered successfully!")
            elif state == 'DRAFT':
                print("‚ö†Ô∏è  Still in DRAFT state. Policy may not have triggered.")
            elif state == 'PUBLISHED':
                print("‚úÖ Already approved and published!")
            
            display(JSON(cert_data, expanded=True))
        else:
            print(f"Status: {response.status_code}")
    except Exception as e:
        print(f"‚ùå Exception: {e}")

## 5. View Pending Claims (Approval Requests)

Claims are approval requests waiting for attestor action.

In [None]:
try:
    # Get all claims
    response = requests.get(
        f"{CLAIMS_URL}/api/v1/getClaims",
        headers=AUTH_HEADERS
    )
    
    if response.status_code == 200:
        claims_data = response.json()
        claims = claims_data.get('claims', [])
        
        print(f"üìã Found {len(claims)} claims\n")
        
        if claims:
            # Display as table
            df = pd.DataFrame(claims)
            display(df[['id', 'entity', 'status', 'attestationName', 'createdAt', 'attestorEntity']])
            
            # Store first claim ID for attestation
            CLAIM_ID = claims[0]['id']
            print(f"\nüéØ Will use Claim ID: {CLAIM_ID} for attestation demo")
        else:
            print("No claims found. The policy might not be active or configured correctly.")
        
        display(JSON(claims_data, expanded=True))
    else:
        print(f"Status: {response.status_code}")
        print(response.text)
except Exception as e:
    print(f"‚ùå Exception: {e}")

### Get Claim Details

In [None]:
if 'CLAIM_ID' in locals():
    try:
        response = requests.get(
            f"{CLAIMS_URL}/api/v1/getClaims/{CLAIM_ID}",
            headers=AUTH_HEADERS
        )
        
        if response.status_code == 200:
            claim_detail = response.json()
            print(f"üìÑ Claim Details for: {CLAIM_ID}\n")
            print(f"Entity: {claim_detail.get('entity')}")
            print(f"Status: {claim_detail.get('status')}")
            print(f"Attestation: {claim_detail.get('attestationName')}")
            print(f"Created: {claim_detail.get('createdAt')}")
            print(f"\nProperty Data:")
            display(JSON(claim_detail.get('propertyData', {}), expanded=True))
        else:
            print(f"Status: {response.status_code}")
    except Exception as e:
        print(f"‚ùå Exception: {e}")
else:
    print("‚ö†Ô∏è  No claim ID available")

## 6. Attest/Approve the Claim

As an attestor (university), we can now **approve** or **reject** the claim.

### Approve the Claim ‚úÖ

In [None]:
if 'CLAIM_ID' in locals():
    attestation_request = {
        "action": "GRANT_CLAIM",  # or "REJECT_CLAIM" to reject
        "notes": "Degree certificate verified and approved by MIT Registrar Office."
    }
    
    try:
        response = requests.post(
            f"{CLAIMS_URL}/api/v1/claims/{CLAIM_ID}",
            headers=AUTH_HEADERS,
            json=attestation_request
        )
        
        if response.status_code == 200:
            print("‚úÖ Claim APPROVED successfully!")
            print("\nüéì The certificate should now be in PUBLISHED state")
            print("üìú A verifiable credential should be generated")
            
            result = response.json()
            display(JSON(result, expanded=True))
        else:
            print(f"‚ùå Error: {response.status_code}")
            print(response.text)
    except Exception as e:
        print(f"‚ùå Exception: {e}")
else:
    print("‚ö†Ô∏è  No claim ID available for attestation")

### Example: Reject a Claim ‚ùå

To reject instead of approve, use this payload:

```python
{
    "action": "REJECT_CLAIM",
    "notes": "Missing required documentation. Please provide official transcripts."
}
```

## 7. Verify Attestation Result

In [None]:
if 'CERTIFICATE_OSID' in locals():
    try:
        # Check certificate state after attestation
        response = requests.get(
            f"{REGISTRY_URL}/api/v1/DegreeCertificate/{CERTIFICATE_OSID}",
            headers=AUTH_HEADERS
        )
        
        if response.status_code == 200:
            cert_data = response.json()
            state = cert_data.get('_osState', 'UNKNOWN')
            
            print(f"üìä Current Certificate State: {state}")
            
            if state == 'PUBLISHED':
                print("‚úÖ Certificate is now PUBLISHED and verified!")
                print("\nüìú Attestation details:")
                
                # Look for attestation data
                if 'attestations' in cert_data:
                    display(JSON(cert_data['attestations'], expanded=True))
                else:
                    print("Full certificate data:")
                    display(JSON(cert_data, expanded=True))
            elif state == 'REJECTED':
                print("‚ùå Certificate was REJECTED")
            else:
                print(f"Current state: {state}")
                display(JSON(cert_data, expanded=True))
    except Exception as e:
        print(f"‚ùå Exception: {e}")

## 8. Retrieve Verifiable Credential

After attestation, a verifiable credential is generated. Let's retrieve it:

In [None]:
if 'CERTIFICATE_OSID' in locals() and 'POLICY_ID' in locals():
    try:
        # Get the attestation certificate
        response = requests.get(
            f"{REGISTRY_URL}/api/v1/DegreeCertificate/{CERTIFICATE_OSID}/attestation/degreeAttestation/{POLICY_ID}",
            headers=AUTH_HEADERS
        )
        
        if response.status_code == 200:
            credential = response.json()
            print("‚úÖ Verifiable Credential Retrieved!\n")
            display(JSON(credential, expanded=True))
        else:
            print(f"Status: {response.status_code}")
            print("Credential might not be available yet or endpoint might differ.")
            print(response.text)
    except Exception as e:
        print(f"‚ùå Exception: {e}")

## 9. Understanding Workflow States

Here's the complete state transition diagram:

```
DRAFT (Initial)
    ‚Üì
    ‚îÇ Property created/updated with attestation policy
    ‚Üì
ATTESTATION_REQUESTED
    ‚Üì
    ‚îÇ Claim raised to attestor
    ‚Üì
Claim Status: OPEN
    ‚Üì
    ‚îú‚îÄ‚Üí GRANT_CLAIM (Approve) ‚Üí PUBLISHED ‚úÖ
    ‚îÇ                             ‚Üì
    ‚îÇ                      Credential Generated
    ‚îÇ
    ‚îî‚îÄ‚Üí REJECT_CLAIM (Reject) ‚Üí REJECTED ‚ùå
```

## 10. Multi-Level Approval Chain (Advanced)

For multi-step approvals, you can chain policies using `onComplete`:

```json
{
    "name": "universityAttestation",
    "type": "MANUAL",
    "attestorEntity": "Institute",
    "onComplete": "ATTESTATION:governmentAttestation"
}
```

This creates a workflow:
```
Student ‚Üí University Approval ‚Üí Government Approval ‚Üí Final Credential
```

## 11. Revoke a Credential

In [None]:
# WARNING: Uncomment to revoke
# if 'CERTIFICATE_OSID' in locals():
#     try:
#         response = requests.post(
#             f"{REGISTRY_URL}/api/v1/DegreeCertificate/{CERTIFICATE_OSID}/revoke",
#             headers=AUTH_HEADERS
#         )
#         
#         if response.status_code == 200:
#             print("‚úÖ Credential revoked successfully")
#         else:
#             print(f"Status: {response.status_code}")
#     except Exception as e:
#         print(f"‚ùå Exception: {e}")

print("‚ö†Ô∏è  Revocation is commented out for safety")

## üéâ Summary

This notebook demonstrated:

### Authentication & Security
- ‚úÖ Keycloak authentication with JWT tokens
- ‚úÖ Admin API access with credentials
- ‚úÖ Token-based API authorization

### Attestation Workflow
- ‚úÖ Creating attestation policies (MANUAL/AUTOMATED)
- ‚úÖ Publishing policies to make them active
- ‚úÖ Automatic claim generation on entity creation
- ‚úÖ Viewing and managing claims
- ‚úÖ Approving/rejecting claims
- ‚úÖ State transitions (DRAFT ‚Üí ATTESTATION_REQUESTED ‚Üí PUBLISHED)
- ‚úÖ Verifiable credential generation
- ‚úÖ Credential revocation

### Key Endpoints Used

| Endpoint | Purpose |
|----------|--------|
| `POST /auth/realms/{realm}/protocol/openid-connect/token` | Get JWT token |
| `POST /api/v1/{entity}/attestationPolicy` | Create policy |
| `GET /api/v1/{entity}/attestationPolicies` | List policies |
| `PUT /api/v1/{entity}/attestationPolicy/{id}/{status}` | Publish policy |
| `GET /api/v1/getClaims` | List claims |
| `GET /api/v1/getClaims/{id}` | Claim details |
| `POST /api/v1/claims/{id}` | Attest claim |
| `GET /api/v1/{entity}/{id}` | Check state |
| `POST /api/v1/{entity}/{id}/revoke` | Revoke credential |

### Next Steps
- Explore automated attestation with plugins (MOSIP, Aadhaar)
- Implement multi-level approval chains
- Build custom UI for claim management
- Set up notification service for claim alerts
- Integrate with credential verification systems

For more information:
- [Sunbird RC Documentation](https://docs.sunbirdrc.dev/)
- [SETUP_GUIDE.md](./SETUP_GUIDE.md)
- [AUTHENTICATION_AND_WORKFLOWS.md](./AUTHENTICATION_AND_WORKFLOWS.md)