# üéØ K·∫øt n·ªëi Debezium v·ªõi PostgreSQL - Change Data Capture (CDC)

Tutorial n√†y h∆∞·ªõng d·∫´n t·ª´ng b∆∞·ªõc c√°ch setup Debezium ƒë·ªÉ capture changes t·ª´ PostgreSQL v√† ƒë·∫©y v√†o Kafka.

## üìã Prerequisites
- Docker & Docker Compose ƒë√£ c√†i ƒë·∫∑t
- Containers ƒëang ch·∫°y: PostgreSQL, Kafka, Zookeeper, Debezium
- Ports: 5432 (PostgreSQL), 9092/9093 (Kafka), 8087 (Debezium)

## ‚úÖ B∆∞·ªõc 0: Verify Containers ƒëang ch·∫°y

In [None]:
%%bash
# Check containers
echo "=== Running Containers ==="
docker ps --format "table {{.Names}}\t{{.Status}}" | grep -E "postgres|kafka|zookeeper|debezium"

echo -e "\n=== Container Health ==="
for container in postgres_db kafka zookeeper debezium; do
    if docker ps | grep -q $container; then
        echo "‚úÖ $container is running"
    else
        echo "‚ùå $container is NOT running"
    fi
done

N·∫øu containers ch∆∞a ch·∫°y, start ch√∫ng:

In [None]:
%%bash
# Uncomment ƒë·ªÉ start containers
# docker-compose up -d postgres zookeeper kafka debezium

## üóÑÔ∏è B∆∞·ªõc 1: Chu·∫©n b·ªã PostgreSQL

### 1.1 Verify PostgreSQL WAL Level

Debezium y√™u c·∫ßu `wal_level=logical` ƒë·ªÉ ƒë·ªçc transaction log.

In [None]:
%%bash
# Check WAL level
docker exec -it postgres_db psql -U admin -d myapp_db -c "SHOW wal_level;"

Output ph·∫£i l√† `logical`. N·∫øu kh√¥ng, s·ª≠a trong `docker-compose.yml`:

```yaml
postgres:
  command: 
    - "postgres"
    - "-c"
    - "wal_level=logical"
```

### 1.2 T·∫°o Table Test

In [None]:
%%bash
# Create users table v·ªõi sample data
docker exec postgres_db psql -U admin -d myapp_db << 'EOSQL'
-- Drop n·∫øu ƒë√£ t·ªìn t·∫°i
DROP TABLE IF EXISTS users;

-- T·∫°o table
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT NOW()
);

-- Insert sample data
INSERT INTO users (name, email) VALUES 
    ('Alice', 'alice@example.com'),
    ('Bob', 'bob@example.com'),
    ('Carol', 'carol@example.com');

-- Verify
SELECT * FROM users;
EOSQL

## üîå B∆∞·ªõc 2: Verify Debezium ƒëang ch·∫°y

In [None]:
import requests
import json

# Check Debezium health
try:
    response = requests.get('http://localhost:8087/')
    print("‚úÖ Debezium is running")
    print(json.dumps(response.json(), indent=2))
except Exception as e:
    print(f"‚ùå Debezium not accessible: {e}")

In [None]:
# List existing connectors
response = requests.get('http://localhost:8087/connectors')
connectors = response.json()

print(f"Existing connectors: {len(connectors)}")
for connector in connectors:
    print(f"  - {connector}")

## üìù B∆∞·ªõc 3: T·∫°o Debezium Connector

### 3.1 Connector Configuration

In [None]:
# Debezium connector config
connector_config = {
    "name": "postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        
        # PostgreSQL connection
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "admin",
        "database.password": "admin",
        "database.dbname": "myapp_db",
        "database.server.name": "postgres",
        
        # Tables to capture
        "table.include.list": "public.users",
        
        # PostgreSQL plugin
        "plugin.name": "pgoutput",
        
        # Kafka topic naming
        "topic.prefix": "postgres",
        
        # Replication slot & publication
        "slot.name": "debezium_slot",
        "publication.name": "debezium_publication",
        
        # Snapshot mode
        "snapshot.mode": "initial",
        
        # Topic creation
        "topic.creation.default.partitions": 3,
        "topic.creation.default.replication.factor": 1
    }
}

print("Connector configuration:")
print(json.dumps(connector_config, indent=2))

### 3.2 Create Connector

In [None]:
# Delete connector if exists
try:
    requests.delete('http://localhost:8087/connectors/postgres-connector')
    print("üóëÔ∏è  Deleted existing connector")
    import time
    time.sleep(2)
except:
    pass

# Create new connector
response = requests.post(
    'http://localhost:8087/connectors',
    headers={'Content-Type': 'application/json'},
    json=connector_config
)

if response.status_code in [200, 201]:
    print("‚úÖ Connector created successfully")
    print(json.dumps(response.json(), indent=2))
else:
    print(f"‚ùå Failed to create connector: {response.status_code}")
    print(response.text)

### 3.3 Verify Connector Status

In [None]:
import time

# Wait for connector to start
print("Waiting for connector to start...")
time.sleep(5)

# Check status
response = requests.get('http://localhost:8087/connectors/postgres-connector/status')
status = response.json()

print("\n=== Connector Status ===")
print(f"Connector State: {status['connector']['state']}")
print(f"Worker ID: {status['connector']['worker_id']}")

print("\n=== Tasks ===")
for task in status['tasks']:
    print(f"Task {task['id']}: {task['state']} on {task['worker_id']}")

if status['connector']['state'] == 'RUNNING':
    print("\n‚úÖ Connector is RUNNING!")
else:
    print(f"\n‚ö†Ô∏è  Connector state: {status['connector']['state']}")

## üîç B∆∞·ªõc 4: Verify PostgreSQL Side Effects

Debezium t·∫°o replication slot v√† publication trong PostgreSQL.

In [None]:
%%bash
echo "=== Replication Slots ==="
docker exec postgres_db psql -U admin -d myapp_db -c "SELECT slot_name, plugin, slot_type, active FROM pg_replication_slots;"

echo -e "\n=== Publications ==="
docker exec postgres_db psql -U admin -d myapp_db -c "SELECT pubname, puballtables FROM pg_publication;"

echo -e "\n=== Publication Tables ==="
docker exec postgres_db psql -U admin -d myapp_db -c "SELECT pubname, schemaname, tablename FROM pg_publication_tables;"

## üìä B∆∞·ªõc 5: Verify Kafka Topics Created

In [None]:
%%bash
echo "=== Kafka Topics ==="
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092

echo -e "\n=== User Topic Details ==="
docker exec kafka kafka-topics \
  --describe \
  --topic postgres.public.users \
  --bootstrap-server localhost:9092

## üëÄ B∆∞·ªõc 6: Xem Snapshot Data trong Kafka

Debezium ƒë√£ snapshot 3 rows ban ƒë·∫ßu t·ª´ PostgreSQL.

In [None]:
%%bash
# Count messages in topic
echo "=== Message Count ==="
docker exec kafka kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic postgres.public.users 2>/dev/null

echo -e "\n=== Sample Message ==="
docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic postgres.public.users \
  --from-beginning \
  --max-messages 1 2>/dev/null | jq '.payload | {before, after, op, source: {snapshot, lsn}}'

## üß™ B∆∞·ªõc 7: Test CDC Realtime

### 7.1 Setup Kafka Consumer (background)

In [None]:
from kafka import KafkaConsumer
import json
from threading import Thread
import time

# Global list to store messages
captured_messages = []

def consume_messages():
    """Background consumer"""
    consumer = KafkaConsumer(
        'postgres.public.users',
        bootstrap_servers=['localhost:9093'],
        auto_offset_reset='latest',  # Only new messages
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        consumer_timeout_ms=5000
    )
    
    print("üéß Consumer started, waiting for messages...")
    
    try:
        for message in consumer:
            payload = message.value.get('payload', {})
            captured_messages.append(payload)
            
            print(f"\nüì® Received message:")
            print(f"  Operation: {payload.get('op')}")
            print(f"  Before: {payload.get('before')}")
            print(f"  After: {payload.get('after')}")
    except Exception as e:
        print(f"Consumer stopped: {e}")
    finally:
        consumer.close()

# Start consumer in background
consumer_thread = Thread(target=consume_messages, daemon=True)
consumer_thread.start()

print("‚úÖ Background consumer started")
time.sleep(2)

### 7.2 Test INSERT

In [None]:
%%bash
# INSERT new user
docker exec postgres_db psql -U admin -d myapp_db -c \
  "INSERT INTO users (name, email) VALUES ('David', 'david@example.com') RETURNING *;"

In [None]:
# Wait for message
time.sleep(3)

# Check captured messages
print(f"\nüìä Captured {len(captured_messages)} new messages")
if captured_messages:
    last_msg = captured_messages[-1]
    print(f"\nLast message:")
    print(f"  Operation: {last_msg.get('op')}")
    print(f"  Data: {last_msg.get('after')}")

### 7.3 Test UPDATE

In [None]:
%%bash
# UPDATE user
docker exec postgres_db psql -U admin -d myapp_db -c \
  "UPDATE users SET email = 'david.new@example.com' WHERE name = 'David' RETURNING *;"

In [None]:
# Wait and check
time.sleep(3)

if len(captured_messages) >= 2:
    update_msg = captured_messages[-1]
    print(f"\nUPDATE captured:")
    print(f"  Before: {update_msg.get('before')}")
    print(f"  After: {update_msg.get('after')}")

### 7.4 Test DELETE

In [None]:
%%bash
# DELETE user
docker exec postgres_db psql -U admin -d myapp_db -c \
  "DELETE FROM users WHERE name = 'David' RETURNING *;"

In [None]:
# Wait and check
time.sleep(3)

if len(captured_messages) >= 3:
    delete_msg = captured_messages[-1]
    print(f"\nDELETE captured:")
    print(f"  Operation: {delete_msg.get('op')}")
    print(f"  Deleted data: {delete_msg.get('before')}")
    print(f"  After: {delete_msg.get('after')}")

## üìä B∆∞·ªõc 8: Summary & Statistics

In [None]:
print("=== CDC Summary ===")
print(f"Total messages captured: {len(captured_messages)}")

# Count by operation type
ops = {}
for msg in captured_messages:
    op = msg.get('op', 'unknown')
    ops[op] = ops.get(op, 0) + 1

print("\nOperations:")
op_names = {'c': 'CREATE (INSERT)', 'u': 'UPDATE', 'd': 'DELETE', 'r': 'READ (SNAPSHOT)'}
for op, count in ops.items():
    print(f"  {op_names.get(op, op)}: {count}")

## üéØ B∆∞·ªõc 9: Access UIs

In [None]:
from IPython.display import HTML, display

html = """
<div style="font-family: Arial; padding: 20px; background: #f5f5f5; border-radius: 10px;">
    <h3>üåê Access Points</h3>
    <ul style="list-style: none; padding: 0;">
        <li style="margin: 10px 0;">
            üìä <a href="http://localhost:8085" target="_blank">Kafka UI</a> - Monitor topics & messages
        </li>
        <li style="margin: 10px 0;">
            üîå <a href="http://localhost:8088" target="_blank">Debezium UI</a> - Manage connectors
        </li>
        <li style="margin: 10px 0;">
            üîß <a href="http://localhost:8087" target="_blank">Debezium API</a> - REST API endpoint
        </li>
    </ul>
</div>
"""

display(HTML(html))

## üõ†Ô∏è Useful Commands

In [None]:
# Helper functions

def check_connector_status():
    """Check connector status"""
    response = requests.get('http://localhost:8087/connectors/postgres-connector/status')
    return response.json()

def restart_connector():
    """Restart connector"""
    response = requests.post('http://localhost:8087/connectors/postgres-connector/restart')
    return response.status_code == 204

def delete_connector():
    """Delete connector"""
    response = requests.delete('http://localhost:8087/connectors/postgres-connector')
    return response.status_code == 204

def get_topic_info():
    """Get Kafka topic info"""
    import subprocess
    result = subprocess.run(
        ['docker', 'exec', 'kafka', 'kafka-topics', 
         '--describe', '--topic', 'postgres.public.users',
         '--bootstrap-server', 'localhost:9092'],
        capture_output=True, text=True
    )
    return result.stdout

print("‚úÖ Helper functions loaded:")
print("  - check_connector_status()")
print("  - restart_connector()")
print("  - delete_connector()")
print("  - get_topic_info()")

## üéâ Conclusion

B·∫°n ƒë√£ ho√†n th√†nh:

‚úÖ Setup PostgreSQL v·ªõi `wal_level=logical`  
‚úÖ T·∫°o Debezium connector  
‚úÖ Verify replication slot v√† publication  
‚úÖ Kafka topics ƒë∆∞·ª£c t·∫°o t·ª± ƒë·ªông  
‚úÖ Test CDC v·ªõi INSERT, UPDATE, DELETE  
‚úÖ Monitor messages realtime  

### Next Steps:

1. **Add more tables**: Update `table.include.list` trong connector config
2. **Setup Flink/Spark**: Consume t·ª´ Kafka v√† process data
3. **Write to Iceberg**: Store CDC data v√†o data lake
4. **Monitor lag**: Track consumer lag trong production

### Resources:

- [Debezium Docs](https://debezium.io/documentation/)
- [Kafka Docs](https://kafka.apache.org/documentation/)
- [PostgreSQL Logical Replication](https://www.postgresql.org/docs/current/logical-replication.html)

## üßπ Cleanup (Optional)

In [None]:
# Uncomment ƒë·ªÉ cleanup

# # Delete connector
# delete_connector()
# print("‚úÖ Connector deleted")

# # Delete replication slot (trong PostgreSQL)
# !docker exec postgres_db psql -U admin -d myapp_db -c "SELECT pg_drop_replication_slot('debezium_slot');"

# # Delete Kafka topic
# !docker exec kafka kafka-topics --delete --topic postgres.public.users --bootstrap-server localhost:9092

print("‚ö†Ô∏è  Uncomment code above to cleanup")