# HashiCorp Vault Transit + การเข้ารหัสในแบบ ORM

โน้ตบุ๊กนี้เลียนแบบการทำงานของ ORM แต่เปลี่ยนการเก็บ/เรียกกุญแจจากการเก็บใน ENV มาเป็นเรียกใช้งานผ่าน HashiCorp Vault (transit).
เราจะแสดงการเข้ารหัส/ถอดรหัสคอลัมน์อย่างโปร่งใสผ่าน SQLAlchemy โดยเรียก Vault ผ่าน HTTP API

## เตรียมสภาพแวดล้อม
- รัน `docker-compose up -d postgres vault` เพื่อให้บริการ Postgres และ Vault พร้อมใช้งาน
- ยืนยันว่า Vault ฟังที่ `http://127.0.0.1:8200` (dev token เริ่มต้น: `root`)
- เปิดใช้งาน virtualenv ในเครื่อง (`.venv\Scripts\Activate.ps1` บน PowerShell) หรือติดตั้ง dependencies จาก `requirements.txt`


In [1]:
# ติดตั้ง dependencies ถ้าจำเป็น (ข้ามได้ถ้าติดตั้งแล้วใน venv)
# %pip install -q "psycopg[binary]>=3.1" SQLAlchemy>=2.0 pandas


## เชื่อมต่อกับ Vault และประกาศโมเดล ORM
- `VaultTransitClient` ห่อหุ้มการเรียก HTTP พื้นฐานที่ต้องใช้กับ Vault
- `VaultTransitEncryptedText` เก็บ ciphertext ใน Postgres และเรียก Vault เพื่อถอดรหัสเมื่ออ่านข้อมูล


In [2]:
import base64
import json
import os
from typing import Optional

import pandas as pd
from sqlalchemy import create_engine, Text, select, text
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
from sqlalchemy.types import TypeDecorator

import urllib.error
import urllib.request

# ค่าการเชื่อมต่อฐานข้อมูล (ปรับได้ผ่าน ENV)
PG_HOST = os.getenv("PG_HOST", "localhost")
PG_PORT = int(os.getenv("PG_PORT", "5432"))
PG_DB = os.getenv("PG_DB", "encdb")
PG_USER = os.getenv("PG_USER", "encuser")
PG_PWD = os.getenv("PG_PASSWORD", "encpass")

# ค่าการเชื่อมต่อ Vault (ปรับได้ผ่าน ENV)
VAULT_ADDR = os.getenv("VAULT_ADDR", "http://127.0.0.1:8200")
VAULT_TOKEN = os.getenv("VAULT_TOKEN", "root")
VAULT_TRANSIT_KEY = os.getenv("VAULT_TRANSIT_KEY", "customer-data")

engine = create_engine(
    f"postgresql+psycopg://{PG_USER}:{PG_PWD}@{PG_HOST}:{PG_PORT}/{PG_DB}",
    future=True,
)


# ข้อผิดพลาดสำหรับการเรียก Vault
class VaultRequestError(RuntimeError):
    def __init__(self, status_code: int, detail: str):
        super().__init__(f"Vault request failed ({status_code}): {detail}")
        self.status_code = status_code
        self.detail = detail


class VaultTransitClient:
    def __init__(self, addr: str, token: str, default_key: str):
        # เก็บค่า base URL ของ Vault และ token สำหรับเรียก API
        self.addr = addr.rstrip("/")
        self.token = token
        self.default_key = default_key

    def _request(self, method: str, path: str, payload: Optional[dict] = None, *, expected=(200, 204, 202)):
        data = None
        if payload is not None:
            data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(f"{self.addr}{path}", data=data, method=method.upper())
        req.add_header("X-Vault-Token", self.token)
        if payload is not None:
            req.add_header("Content-Type", "application/json")
        try:
            with urllib.request.urlopen(req) as resp:
                status = resp.status
                body = resp.read()
        except urllib.error.HTTPError as err:
            detail = err.read().decode("utf-8", errors="ignore")
            raise VaultRequestError(err.code, detail) from err
        except urllib.error.URLError as err:
            raise RuntimeError(f"ไม่สามารถติดต่อ Vault ที่ {self.addr}: {err.reason}") from err
        if status not in expected:
            raise VaultRequestError(status, body.decode("utf-8", errors="ignore"))
        if not body:
            return {}
        return json.loads(body.decode("utf-8"))

    def ensure_transit(self):
        mounts = self._request("GET", "/v1/sys/mounts")
        if "transit/" not in mounts.get("data", {}):
            self._request("POST", "/v1/sys/mounts/transit", {"type": "transit"})
            print("เปิดใช้งาน transit secrets engine แล้ว")
        else:
            print("Transit secrets engine พร้อมใช้งานอยู่แล้ว")

    def ensure_key(self, key_name: Optional[str] = None):
        name = key_name or self.default_key
        try:
            return self._request("GET", f"/v1/transit/keys/{name}")
        except VaultRequestError as err:
            if err.status_code != 404:
                raise
        # สร้างกุญแจแบบ AES-GCM ถ้ายังไม่มี
        self._request("POST", f"/v1/transit/keys/{name}", {"type": "aes256-gcm96"})
        return self._request("GET", f"/v1/transit/keys/{name}")

    def encrypt(self, plaintext: str, key_name: Optional[str] = None) -> str:
        name = key_name or self.default_key
        # encode เป็น base64 ก่อนส่งให้ Vault
        encoded = base64.b64encode(plaintext.encode("utf-8")).decode("utf-8")
        response = self._request("POST", f"/v1/transit/encrypt/{name}", {"plaintext": encoded})
        return response["data"]["ciphertext"]

    def decrypt(self, ciphertext: str, key_name: Optional[str] = None) -> str:
        name = key_name or self.default_key
        response = self._request("POST", f"/v1/transit/decrypt/{name}", {"ciphertext": ciphertext})
        decoded = base64.b64decode(response["data"]["plaintext"]).decode("utf-8")
        return decoded


# สร้าง client และเตรียม transit/key ถ้ายังไม่ถูกสร้าง
vault_client = VaultTransitClient(VAULT_ADDR, VAULT_TOKEN, VAULT_TRANSIT_KEY)
vault_client.ensure_transit()
key_info = vault_client.ensure_key()
print(f"Transit key '{VAULT_TRANSIT_KEY}' พร้อมใช้งาน (latest version {key_info['data']['latest_version']})")


class Base(DeclarativeBase):
    pass


class VaultTransitEncryptedText(TypeDecorator):
    impl = Text
    cache_ok = True

    def __init__(self, transit_client: VaultTransitClient, key_name: Optional[str] = None):
        super().__init__()
        self.transit_client = transit_client
        self.key_name = key_name or transit_client.default_key

    def process_bind_param(self, value, dialect):
        # ก่อนบันทึก: เรียก Vault encrypt เพื่อรับ ciphertext (string) แล้วเก็บในคอลัมน์
        if value is None:
            return None
        return self.transit_client.encrypt(value, self.key_name)

    def process_result_value(self, value, dialect):
        # ตอนอ่าน: เรียก Vault decrypt เพื่อถอดค่า แล้วคืน plaintext
        if value is None:
            return None
        return self.transit_client.decrypt(value, self.key_name)


class CustomerSecretVault(Base):
    __tablename__ = "customer_secret_vault"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    full_name: Mapped[str] = mapped_column(Text, nullable=False)
    national_id: Mapped[str] = mapped_column(VaultTransitEncryptedText(vault_client), nullable=False)

    def __repr__(self) -> str:
        return f"<CustomerSecretVault id={self.id} full_name={self.full_name!r}>"


with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS customer_secret_vault"))
Base.metadata.create_all(engine)
print("สร้างตาราง customer_secret_vault ใหม่แล้ว")


เปิดใช้งาน transit secrets engine แล้ว
Transit key 'customer-data' พร้อมใช้งาน (latest version 1)
สร้างตาราง customer_secret_vault ใหม่แล้ว


## ใส่ข้อมูลตัวอย่าง (Vault จะจัดการการเข้ารหัส)


In [3]:
# ตัวอย่างข้อมูลเพื่อทดสอบ
samples = [
    ("Alice Carter", "1101700200011"),
    ("Brian Cruz", "1101700200022"),
    ("Chloe Ramsey", "1101700200033"),
]

with Session(engine) as session:
    session.add_all([CustomerSecretVault(full_name=name, national_id=nid) for name, nid in samples])
    session.commit()
print("เพิ่มข้อมูลตัวอย่างโดยใช้ Vault transit encryption แล้ว")


เพิ่มข้อมูลตัวอย่างโดยใช้ Vault transit encryption แล้ว


## ตรวจสอบ ciphertext ที่เก็บจริงใน Postgres
- ORM จะเก็บค่า ciphertext ที่ได้จาก Vault (มักขึ้นต้นด้วย `vault:`) ในคอลัมน์


In [4]:
# ดึงค่า ciphertext ที่เก็บในฐานข้อมูลโดยตรง
with engine.begin() as conn:
    rows = conn.execute(
        text("SELECT id, full_name, national_id AS ciphertext FROM customer_secret_vault ORDER BY id")
    ).fetchall()
pd.DataFrame(rows, columns=["id", "full_name", "ciphertext"])


Unnamed: 0,id,full_name,ciphertext
0,1,Alice Carter,vault:v1:bY5XYpOW46tDSuvBOF1XYm83mwLmaNG9xCn6g...
1,2,Brian Cruz,vault:v1:nKL7zVukSLZ9KvJnDiq0mzbfR/EciCRgg1iKB...
2,3,Chloe Ramsey,vault:v1:H2kYO/dPZSmfF6p62TFmPWrdAZlNJvOA45O05...


## คิวรีผ่าน ORM (Vault จะถอดรหัสให้โดยอัตโนมัติ)
- การอ่านผ่าน SQLAlchemy จะเรียก `process_result_value` ซึ่งจะเรียก Vault เพื่ถอดค่า ciphertext เป็น plaintext


In [5]:
# อ่านข้อมูลผ่าน ORM ซึ่งจะได้ค่า plaintext เพราะ Vault ถอดรหัสให้
with Session(engine) as session:
    result = session.execute(
        select(CustomerSecretVault.id, CustomerSecretVault.full_name, CustomerSecretVault.national_id)
        .order_by(CustomerSecretVault.id)
    ).all()
pd.DataFrame(result, columns=["id", "full_name", "national_id_plain"])


Unnamed: 0,id,full_name,national_id_plain
0,1,Alice Carter,1101700200011
1,2,Brian Cruz,1101700200022
2,3,Chloe Ramsey,1101700200033


### หมายเหตุ
- ตัวอย่างนี้ใช้ token แบบ dev/root เพื่อความสะดวกในการทดสอบ — ในการใช้งานจริง ให้ใช้ AppRole, JWT หรือวิธีการให้สิทธิ์ที่ปลอดภัยกว่า
- Vault Transit จะไม่เผย key material ดิบให้กับผู้เรียก ทำให้ ciphertext ที่เก็บใน Postgres ปลอดภัยยิ่งขึ้น
- การอ่านแต่ละแถวจะเรียก Vault เพื่ถอดรหัส — หากต้องการประสิทธิภาพสำหรับชุดข้อมูลขนาดใหญ่ ให้พิจารณาการ cache, การรวมคิวรี หรือตัดสินใจใช้วิธีถอดรหัสแบบ batch
