In [1]:
import json
import time
from dataclasses import dataclass

In [2]:
def encrypt(plaintext: dict, key: str) -> str:
    data = json.dumps(plaintext) # dicttojson으로평문생성
    return f"{key}:{data}" # 키가앞에붙어있는지확인을위함
def decrypt(ciphertext: str, key: str) -> dict:
    prefix, data = ciphertext.split(":", 1) # 키와평문을구분하기위함
    if prefix != key: # 키가다르면에러를던지도록설정
        raise ValueError("키불일치-복호화실패")
    return json.loads(data)



In [3]:
# Alice KDC 사이의장기대칭키
K_A = "KEY_ALICE_KDC"

# BobServiceKDC 사이의장기대칭키
K_B = "KEY_BOB_KDC"

# TGT 암호화용KDC 내부키(KDC만알고있다고가정)
K_TGT = "KEY_TGT_KDC"

In [4]:
class KDC:
    """Kerberos의KDC 역할:
    - AS (Authentication Server): 로그인시TGT 발급
    -TGS (Ticket Granting Server): 서비스티켓발급"""
    def __init__(self): self.issued_count= 0
    
    def _new_session_key(self, label: str) -> str:
        """새로운세션키를문자열형태로생성.
        실제Kerberos에서는난수기반대칭키를생성하는부분에해당."""
        self.issued_count+= 1
        return f"SESSION_{label}_{self.issued_count}"
    def issue_tgt(self, client_id: str) -> str:
        K_c_kdc= self._new_session_key(f"{client_id}_KDC") 
        # KDC 와공유하는세션키발급
        tgt_plain= {
            "client": client_id,
            "session_key": K_c_kdc
        } # client_id를함께보관함으로써, 서명된신분증데이터
        tgt_cipher= encrypt(tgt_plain, K_TGT) # K_TGT는KDC만알고있는내부키

        response_plain= {
            "session_key": K_c_kdc,
            "tgt": tgt_cipher
        } # 클라이언트에게넘겨줄실제내용세선키+ 신분증
        enc_for_client= encrypt(response_plain, K_A) # Alice와KDC가공휴나는장기키
        return enc_for_client
    def issue_service_ticket(self, tgt_cipher, authenticator_cipher, service_id):
        tgt_plain= decrypt(tgt_cipher, K_TGT) # KDC만아는K_TGT로TGT를복호화→ 누가접속중인지확인.
        client_id= tgt_plain["client"] # DC는DB 대신TGT를열어서사용자의상태를읽음.
        K_c_kdc= tgt_plain["session_key"]

        auth_plain= decrypt(authenticator_cipher, K_c_kdc) # 클라이언트가보낸Authenticator는K_c_kdc로암호화되어있음
        K_c_s= self._new_session_key(f"{client_id}_{service_id}") # 이제KDC는“클라이언트–서비스”사이에쓸새로운세션키를생성
        ticket_plain= {
            "client": client_id,
            "session_key": K_c_s
        }
        ticket_cipher= encrypt(ticket_plain, K_B)
        response_plain= {
            "session_key": K_c_s,
            "ticket": ticket_cipher
        }
        enc_for_client= encrypt(response_plain, K_c_kdc)#이티켓은K_B로암호화→ Bob만읽을수있음
        return enc_for_client

In [5]:
class ServiceServer:
	"""
	Kerberos에서 실제 서비스 서버(Bob)에 해당.
	- K_B: Bob <-> KDC 사이 장기대칭키
	- 티켓은 K_B로 암호화되어 Bob만 읽을 수 있음
	"""
	def __init__(self, service_id: str, key_with_kdc: str):
		self.service_id = service_id
		self.key_with_kdc = key_with_kdc  # K_B

	def receive_request(self, ticket_cipher: str, authenticator_cipher: str):
		"""
		클라이언트의실제서비스접근요청처리(AP 단계).
		-입력:
		-ticket_cipher: TicketToService(K_B로암호화)
		-authenticator_cipher: K_c_s로암호화된Authenticator2
		-과정:
		1) ticket_cipher를K_B로복호화→ client_id, K_c_s획득
		2) authenticator_cipher를K_c_s로복호화→ client_id, timestamp 확인
		3) 두client_id동일여부확인→ 인증성공/실패
		4) 성공시K_c_s로암호화된응답반환
		"""
		print(f"\n[{self.service_id}] === 서비스요청수신===")
		# 1) 티켓복호화(K_B 사용)
		ticket_plain= decrypt(ticket_cipher, self.key_with_kdc)
		client_id= ticket_plain["client"]
		K_c_s= ticket_plain["session_key"]
		print(f"[{self.service_id}] 복호화된티켓: {ticket_plain}")

		# 2) Authenticator2 복호화(K_c_s사용)
		auth_plain= decrypt(authenticator_cipher, K_c_s)
		print(f"[{self.service_id}] 복호화된Authenticator: {auth_plain}")
		# 3) ID 일치여부확인
		if auth_plain["client"] != client_id:
			print(f"[{self.service_id}] 클라이언트ID 불일치! 인증실패")
			return None
		print(f"[{self.service_id}] 클라이언트'{client_id}' 인증성공!")
		# 4) 세션키로응답암호화
		response_plain= {
			"msg": f"Hello, {client_id}! This is {self.service_id}.",
			"timestamp": time.time()
		}
		response_cipher= encrypt(response_plain, K_c_s)
		print(f"[{self.service_id}] 응답(암호문, K_c_s): {response_cipher}")
		return response_cipher

In [6]:
@dataclass
class Client:
    """
    Kerberos 클라이언트(Alice).
    - key_with_kdc: Alice KDC 장기키(K_A)
    - login() 후 K_c_kdc, TGT 획득
    - request_service_ticket() 후 K_c_s, ticket_for_service 획득
    """
    client_id: str
    key_with_kdc: str  # K_A
    # 로그인 후 채워지는 값들
    K_c_kdc: str = None
    tgt: str = None
    # 서비스 티켓 요청 후 채워지는 값들
    K_c_s: str = None
    ticket_for_service: str = None

    def login(self, kdc):
        enc = kdc.issue_tgt(self.client_id)
        response_plain = decrypt(enc, self.key_with_kdc)
        self.K_c_kdc = response_plain["session_key"]
        self.tgt = response_plain["tgt"]

    def request_service_ticket(self, kdc, service_id: str):
        auth_plain = {
            "client": self.client_id,
            "timestamp": time.time(),
        }
        authenticator_cipher = encrypt(auth_plain, self.K_c_kdc)
        enc = kdc.issue_service_ticket(self.tgt, authenticator_cipher, service_id)
        response_plain = decrypt(enc, self.K_c_kdc)
        self.K_c_s = response_plain["session_key"]
        self.ticket_for_service = response_plain["ticket"]

    def access_service(self, service):
        auth2_plain = {
            "client": self.client_id,
            "timestamp": time.time(),
        }
        authenticator2_cipher = encrypt(auth2_plain, self.K_c_s)
        response_cipher = service.receive_request(self.ticket_for_service, authenticator2_cipher)
        if response_cipher is None:
            return
        response_plain = decrypt(response_cipher, self.K_c_s)
        print(response_plain)

    def receive_request(self, ticket_cipher: str, authenticator_cipher: str):
        ticket_plain= decrypt(ticket_cipher, self.key_with_kdc) # self.key_with_kdc= K_B (Bob KDC) Bob만내용을볼수있음
        client_id= ticket_plain["client"]
        K_c_s= ticket_plain["session_key"]
        auth_plain= decrypt(authenticator_cipher, K_c_s)
        # “티켓안에있는사용자ID”와“Authenticator 안의사용자ID”가같은지확인
        if auth_plain["client"] != client_id:
            print("인증실패")
            return None # 일치성검사로실제사용자인증이완료
        response_plain= {
            "msg": f"Hello, {client_id}!",
            "timestamp": time.time()
        }
        response_cipher= encrypt(response_plain, K_c_s)
        return response_cipher

In [None]:
if __name__ == "__main__":
    # KDC, 서비스서버생성
    kdc = KDC()
    bob_service = ServiceServer(service_id="BobService", key_with_kdc=K_B)

    # 클라이언트생성
    alice_client = Client(client_id="Alice", key_with_kdc=K_A)
    ''' 동일한 코드
    alice = Client()
    alice.client_id = "Alice"
    alice.key_with_kdc = K_A
    '''

    # 1) 로그인과TGT획득
    print("\n=== 1) 로그인과 TGT 획득 ===")
    alice_client.login(kdc)

    # 2) 서비스티켓요청
    print("\n=== 2) 서비스티켓 요청 ===")
    alice_client.request_service_ticket(kdc, service_id="BobService")

    # 3) 실제서비스접근
    print("\n=== 3) 실제 서비스 접근 ===")
    alice_client.access_service(bob_service)


=== 1) 로그인과 TGT 획득 ===

=== 2) 서비스티켓 요청 ===

=== 3) 실제 서비스 접근 ===

[BobService] === 서비스요청수신===
[BobService] 복호화된티켓: {'client': 'Alice', 'session_key': 'SESSION_Alice_BobService_2'}
[BobService] 복호화된Authenticator: {'client': 'Alice', 'timestamp': 1763965761.2632916}
[BobService] 클라이언트'Alice' 인증성공!
[BobService] 응답(암호문, K_c_s): SESSION_Alice_BobService_2:{"msg": "Hello, Alice! This is BobService.", "timestamp": 1763965761.263406}
{'msg': 'Hello, Alice! This is BobService.', 'timestamp': 1763965761.263406}
