<a href="https://colab.research.google.com/github/DevManoj19/hackathon1/blob/main/hackathon_prototype.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# Install required dependencies
!pip install cryptography requests

import hashlib
import base64
import json
import datetime
import uuid
from typing import Dict, List, Optional, Tuple
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature, decode_dss_signature

class GDPRCompliantDIDSystem:
    """
    GDPR-Compliant Decentralized Identifier System for Cultural Heritage Data
    Implements W3C DID standards with privacy-by-design principles
    """

    def __init__(self):
        self.did_registry = {}
        self.consent_records = {}
        self.audit_log = []
        self.revoked_dids = set()

    def generate_did(self, method: str = "example") -> Tuple[str, Dict, ed25519.Ed25519PrivateKey]:
        """Generate a new DID with cryptographic key pair"""
        try:
            # Generate Ed25519 key pair
            private_key = ed25519.Ed25519PrivateKey.generate()
            public_key = private_key.public_key()

            # Create DID from public key hash
            public_bytes = public_key.public_bytes(
                encoding=serialization.Encoding.Raw,
                format=serialization.PublicFormat.Raw
            )
            did_id = hashlib.sha256(public_bytes).hexdigest()[:32]
            did = f"did:{method}:{did_id}"

            # Create W3C-compliant DID Document
            did_document = {
                "@context": [
                    "https://www.w3.org/ns/did/v1",
                    "https://w3id.org/security/suites/ed25519-2020/v1"
                ],
                "id": did,
                "verificationMethod": [{
                    "id": f"{did}#key1",
                    "type": "Ed25519VerificationKey2020",
                    "controller": did,
                    "publicKeyBase64": base64.b64encode(public_bytes).decode()
                }],
                "authentication": [f"{did}#key1"],
                "assertionMethod": [f"{did}#key1"],
                "capabilityInvocation": [f"{did}#key1"],
                "capabilityDelegation": [f"{did}#key1"],
                "created": datetime.datetime.utcnow().isoformat() + "Z",
                "updated": datetime.datetime.utcnow().isoformat() + "Z"
            }

            # Store in registry with GDPR metadata
            self.did_registry[did] = {
                "document": did_document,
                "created": datetime.datetime.utcnow(),
                "gdpr_metadata": {
                    "data_subject_consent": False,
                    "purpose": "identity_verification",
                    "retention_period": "7_years",
                    "can_be_forgotten": True
                }
            }

            self._log_action("DID_CREATED", did, "New DID generated")
            return did, did_document, private_key

        except Exception as e:
            self._log_action("ERROR", None, f"DID generation failed: {str(e)}")
            raise

    def resolve_did(self, did: str) -> Optional[Dict]:
        """Resolve DID to its document with GDPR checks"""
        if did in self.revoked_dids:
            self._log_action("ACCESS_DENIED", did, "Attempted access to revoked DID")
            return None

        record = self.did_registry.get(did)
        if record:
            self._log_action("DID_RESOLVED", did, "DID document accessed")
            return record["document"]

        self._log_action("DID_NOT_FOUND", did, "DID resolution failed")
        return None

    def sign_data(self, data: str, private_key: ed25519.Ed25519PrivateKey) -> str:
        """Sign data using Ed25519 private key"""
        try:
            data_bytes = data.encode('utf-8')
            signature = private_key.sign(data_bytes)
            return base64.b64encode(signature).decode()
        except Exception as e:
            self._log_action("ERROR", None, f"Signing failed: {str(e)}")
            raise

    def verify_signature(self, data: str, signature: str, did: str) -> bool:
        """Verify signature using public key from DID document"""
        try:
            did_doc = self.resolve_did(did)
            if not did_doc:
                return False

            # Extract public key
            verification_method = did_doc["verificationMethod"][0]
            public_key_b64 = verification_method["publicKeyBase64"]
            public_key_bytes = base64.b64decode(public_key_b64)

            # Reconstruct public key
            public_key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)

            # Verify signature
            signature_bytes = base64.b64decode(signature)
            data_bytes = data.encode('utf-8')

            public_key.verify(signature_bytes, data_bytes)
            self._log_action("SIGNATURE_VERIFIED", did, "Signature verification successful")
            return True

        except Exception as e:
            self._log_action("SIGNATURE_FAILED", did, f"Signature verification failed: {str(e)}")
            return False

    # GDPR Compliance Methods

    def record_consent(self, did: str, purpose: str, consent_given: bool) -> str:
        """Record user consent with timestamp and purpose"""
        consent_id = str(uuid.uuid4())
        consent_record = {
            "consent_id": consent_id,
            "did": did,
            "purpose": purpose,
            "consent_given": consent_given,
            "timestamp": datetime.datetime.utcnow().isoformat(),
            "ip_address": "redacted_for_privacy"  # In real implementation, hash this
        }

        self.consent_records[consent_id] = consent_record

        # Update DID metadata
        if did in self.did_registry:
            self.did_registry[did]["gdpr_metadata"]["data_subject_consent"] = consent_given

        self._log_action("CONSENT_RECORDED", did, f"Consent {consent_given} for {purpose}")
        return consent_id

    def withdraw_consent(self, did: str, consent_id: str) -> bool:
        """Withdraw previously given consent"""
        if consent_id in self.consent_records:
            self.consent_records[consent_id]["consent_given"] = False
            self.consent_records[consent_id]["withdrawn_at"] = datetime.datetime.utcnow().isoformat()

            # Update DID metadata
            if did in self.did_registry:
                self.did_registry[did]["gdpr_metadata"]["data_subject_consent"] = False

            self._log_action("CONSENT_WITHDRAWN", did, f"Consent withdrawn: {consent_id}")
            return True
        return False

    def right_to_be_forgotten(self, did: str) -> bool:
        """Implement right to erasure (right to be forgotten)"""
        try:
            if did in self.did_registry:
                # Mark DID as revoked
                self.revoked_dids.add(did)

                # Remove from active registry while keeping audit trail
                self.did_registry[did]["status"] = "REVOKED_BY_DATA_SUBJECT"
                self.did_registry[did]["revoked_at"] = datetime.datetime.utcnow().isoformat()

                # Remove associated consent records (but keep audit trail)
                for consent_id, record in list(self.consent_records.items()):
                    if record["did"] == did:
                        record["status"] = "ERASED"

                self._log_action("RIGHT_TO_BE_FORGOTTEN", did, "DID erased by data subject request")
                return True

        except Exception as e:
            self._log_action("ERROR", did, f"Right to be forgotten failed: {str(e)}")

        return False

    def export_user_data(self, did: str) -> Dict:
        """Export all user data for portability (GDPR Article 20)"""
        user_data = {
            "did": did,
            "export_timestamp": datetime.datetime.utcnow().isoformat(),
            "did_document": self.resolve_did(did),
            "consent_records": [],
            "audit_trail": []
        }

        # Include consent records
        for record in self.consent_records.values():
            if record["did"] == did:
                user_data["consent_records"].append(record)

        # Include relevant audit trail
        for entry in self.audit_log:
            if entry.get("did") == did:
                user_data["audit_trail"].append(entry)

        self._log_action("DATA_EXPORTED", did, "User data exported")
        return user_data

    # Cultural Heritage Integration

    def create_cultural_heritage_credential(self, issuer_did: str, subject_did: str,
                                          artifact_data: Dict, private_key: ed25519.Ed25519PrivateKey) -> Dict:
        """Create a verifiable credential for cultural heritage artifacts"""
        credential = {
            "@context": [
                "https://www.w3.org/2018/credentials/v1",
                "https://schema.org/",
                "https://w3id.org/cultural-heritage/v1"
            ],
            "id": f"urn:uuid:{uuid.uuid4()}",
            "type": ["VerifiableCredential", "CulturalHeritageCredential"],
            "issuer": issuer_did,
            "issuanceDate": datetime.datetime.utcnow().isoformat() + "Z",
            "credentialSubject": {
                "id": subject_did,
                "artifact": {
                    "name": artifact_data.get("name", ""),
                    "description": artifact_data.get("description", ""),
                    "dateCreated": artifact_data.get("date_created", ""),
                    "creator": artifact_data.get("creator", ""),
                    "culturalContext": artifact_data.get("cultural_context", ""),
                    "conservationStatus": artifact_data.get("conservation_status", ""),
                    "location": artifact_data.get("location", ""),
                    "provenance": artifact_data.get("provenance", [])
                }
            }
        }

        # Sign the credential
        credential_json = json.dumps(credential, sort_keys=True)
        signature = self.sign_data(credential_json, private_key)

        # Add proof
        credential["proof"] = {
            "type": "Ed25519Signature2020",
            "created": datetime.datetime.utcnow().isoformat() + "Z",
            "proofPurpose": "assertionMethod",
            "verificationMethod": f"{issuer_did}#key1",
            "proofValue": signature
        }

        self._log_action("CREDENTIAL_ISSUED", subject_did, f"Cultural heritage credential issued")
        return credential

    # Dataverse Integration Mock

    def integrate_with_dataverse(self, did: str, dataset_metadata: Dict) -> Dict:
        """Mock integration with Dataverse for metadata management"""
        integration_record = {
            "did": did,
            "dataverse_doi": f"doi:10.7910/DVN/{uuid.uuid4().hex[:8].upper()}",
            "dataset_metadata": dataset_metadata,
            "integration_timestamp": datetime.datetime.utcnow().isoformat(),
            "handle": f"hdl:1902.1/{uuid.uuid4().hex[:10]}",
            "access_permissions": {
                "public_metadata": True,
                "restricted_data": False,
                "gdpr_compliant": True
            }
        }

        self._log_action("DATAVERSE_INTEGRATION", did, "Dataset integrated with Dataverse")
        return integration_record

    # Utility Methods

    def _log_action(self, action: str, did: Optional[str], details: str):
        """Log all actions for audit trail"""
        log_entry = {
            "timestamp": datetime.datetime.utcnow().isoformat(),
            "action": action,
            "did": did,
            "details": details,
            "log_id": str(uuid.uuid4())
        }
        self.audit_log.append(log_entry)

    def get_audit_trail(self, did: Optional[str] = None) -> List[Dict]:
        """Get audit trail for specific DID or all actions"""
        if did:
            return [entry for entry in self.audit_log if entry.get("did") == did]
        return self.audit_log

    def get_system_status(self) -> Dict:
        """Get system statistics and status"""
        return {
            "total_dids": len(self.did_registry),
            "active_dids": len(self.did_registry) - len(self.revoked_dids),
            "revoked_dids": len(self.revoked_dids),
            "consent_records": len(self.consent_records),
            "audit_entries": len(self.audit_log),
            "system_uptime": "operational"
        }

# Demonstration and Testing
def demonstrate_system():
    """Comprehensive demonstration of the DID system"""
    print("=== GDPR-Compliant DID System for Cultural Heritage ===\n")

    # Initialize system
    did_system = GDPRCompliantDIDSystem()

    # 1. Generate DIDs for different actors
    print("1. Generating DIDs...")

    # Museum curator
    curator_did, curator_doc, curator_key = did_system.generate_did("heritage")
    print(f"Curator DID: {curator_did}")

    # Artifact (represented by DID)
    artifact_did, artifact_doc, artifact_key = did_system.generate_did("artifact")
    print(f"Artifact DID: {artifact_did}")

    # Researcher
    researcher_did, researcher_doc, researcher_key = did_system.generate_did("research")
    print(f"Researcher DID: {researcher_did}")

    # 2. GDPR Consent Management
    print("\n2. GDPR Consent Management...")
    consent_id = did_system.record_consent(
        researcher_did,
        "cultural_heritage_research",
        True
    )
    print(f"Consent recorded: {consent_id}")

    # 3. Create Cultural Heritage Credential
    print("\n3. Creating Cultural Heritage Credential...")
    artifact_data = {
        "name": "Ancient Greek Amphora",
        "description": "Red-figure pottery amphora from 5th century BCE",
        "date_created": "450 BCE",
        "creator": "Unknown Athenian potter",
        "cultural_context": "Classical Greek",
        "conservation_status": "Good",
        "location": "National Archaeological Museum",
        "provenance": [
            "Excavated in Athens, 1952",
            "Acquired by museum, 1955",
            "Restored, 1980"
        ]
    }

    credential = did_system.create_cultural_heritage_credential(
        curator_did,
        artifact_did,
        artifact_data,
        curator_key
    )
    print("Cultural heritage credential created")

    # 4. Verify Credential
    print("\n4. Verifying Credential...")
    credential_json = json.dumps({k: v for k, v in credential.items() if k != "proof"}, sort_keys=True)
    is_valid = did_system.verify_signature(
        credential_json,
        credential["proof"]["proofValue"],
        curator_did
    )
    print(f"Credential verification: {'VALID' if is_valid else 'INVALID'}")

    # 5. Dataverse Integration
    print("\n5. Dataverse Integration...")
    dataset_metadata = {
        "title": "Ancient Greek Pottery Collection",
        "description": "Digital archive of classical Greek ceramics",
        "keywords": ["archaeology", "pottery", "ancient greece"],
        "access_level": "restricted"
    }

    dataverse_record = did_system.integrate_with_dataverse(artifact_did, dataset_metadata)
    print(f"DOI assigned: {dataverse_record['dataverse_doi']}")
    print(f"Handle: {dataverse_record['handle']}")

    # 6. Export User Data (GDPR Article 20)
    print("\n6. Data Portability...")
    user_data = did_system.export_user_data(researcher_did)
    print(f"Exported {len(user_data['consent_records'])} consent records")
    print(f"Exported {len(user_data['audit_trail'])} audit entries")

    # 7. Demonstrate Right to be Forgotten
    print("\n7. Right to be Forgotten...")
    forgotten = did_system.right_to_be_forgotten(researcher_did)
    print(f"Right to erasure executed: {'SUCCESS' if forgotten else 'FAILED'}")

    # Try to access revoked DID
    revoked_doc = did_system.resolve_did(researcher_did)
    print(f"Access to revoked DID: {'DENIED' if revoked_doc is None else 'ALLOWED'}")

    # 8. System Status
    print("\n8. System Status...")
    status = did_system.get_system_status()
    for key, value in status.items():
        print(f"{key}: {value}")

    # 9. Display Sample DID Document
    print(f"\n9. Sample DID Document (Curator):")
    print(json.dumps(curator_doc, indent=2))

    print(f"\n10. Sample Cultural Heritage Credential:")
    print(json.dumps(credential, indent=2)[:500] + "...")

# Run the demonstration
if __name__ == "__main__":
    demonstrate_system()


=== GDPR-Compliant DID System for Cultural Heritage ===

1. Generating DIDs...
Curator DID: did:heritage:cfe8d94b1c6dcd468bf76e47ce0a52e3
Artifact DID: did:artifact:25868d1cecf05434b6be7f867e0420af
Researcher DID: did:research:996ce92385512e4d8ae78b690ac7fab1

2. GDPR Consent Management...
Consent recorded: a2305c7b-78ba-4f48-85b0-e2b8c000fbdc

3. Creating Cultural Heritage Credential...
Cultural heritage credential created

4. Verifying Credential...
Credential verification: VALID

5. Dataverse Integration...
DOI assigned: doi:10.7910/DVN/9B0A0B8E
Handle: hdl:1902.1/fe29963014

6. Data Portability...
Exported 1 consent records
Exported 3 audit entries

7. Right to be Forgotten...
Right to erasure executed: SUCCESS
Access to revoked DID: DENIED

8. System Status...
total_dids: 3
active_dids: 2
revoked_dids: 1
consent_records: 1
audit_entries: 12
system_uptime: operational

9. Sample DID Document (Curator):
{
  "@context": [
    "https://www.w3.org/ns/did/v1",
    "https://w3id.org/secu