# RDS連線

In [None]:
import psycopg2
import random
import string
from faker import Faker

In [None]:


# 設定 PostgreSQL 連線資訊
DB_HOST = "pg-cdc-instance.cxkm6uu604v9.ap-northeast-1.rds.amazonaws.com"  # 例如 "localhost" 或 RDS 端點
DB_NAME = "cdc_db"
DB_USER = "HAIRE"
DB_PASSWORD = "834rg02sdflk"
DB_PORT = 5432  # PostgreSQL 預設 Port
# 初始化 Faker
fake = Faker()
# 產生隨機資料
def generate_random_user():
    username = "".join(random.choices(string.ascii_letters + string.digits, k=5)) + fake.user_name()
    email = "".join(random.choices(string.ascii_letters + string.digits, k=10)) + fake.email()
    password_hash = "".join(random.choices(string.ascii_letters + string.digits, k=20))  # 產生隨機 20 位的雜湊密碼
    phone = fake.phone_number()[0:20]
    address = fake.address().replace("\n", ", ")  # PostgreSQL 儲存時避免換行
    return (username, email, password_hash, phone, address)

# 連接 PostgreSQL
def insert_random_users(n=1):
    try:
        conn = psycopg2.connect(
            host=DB_HOST,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            port=DB_PORT
        )
        cursor = conn.cursor()

        # 插入 N 筆隨機資料
        for _ in range(n):
            user_data = generate_random_user()
            cursor.execute("""
                INSERT INTO users (username, email, password_hash, phone, address)
                VALUES (%s, %s, %s, %s, %s);
            """, user_data)

        conn.commit()
        print(f"✅ 成功插入 {n} 筆測試資料！")

    except Exception as e:
        print(f"❌ 錯誤: {e}")

    finally:
        cursor.close()
        conn.close()

# 執行插入 10 筆測試資料
if __name__ == "__main__":
    for _ in range(2):
        insert_random_users(2)

# 架構圖

In [None]:
from diagrams import Diagram, Cluster, Node, Edge
from diagrams.aws.database import RDS
from diagrams.aws.migration import DMS
from diagrams.aws.analytics import KinesisDataStreams, Glue, Kinesis, KinesisDataFirehose
from diagrams.aws.storage import S3
from diagrams.aws.compute import Lambda, ECS, EKS, ElasticBeanstalk
from diagrams.aws.integration import SNS, SQS, Eventbridge, StepFunctions
from diagrams.aws.network import APIGateway, VPC
from diagrams.aws.ml import *
from diagrams.onprem.client import User, Users
from diagrams.generic.device import Mobile,Tablet
# 設定字體大小 (fontsize)
graph_attr = {
    "fontsize": "18"  # 整體圖的字體大小
}

node_attr = {
    "fontsize": "14"  # 節點的字體大小
}

edge_attr = {
    "fontsize": "14"  # 邊線的字體大小 (如果有邊線標籤)
}
with Diagram("",
              show=False, 
              filename="data_architecture", 
              direction="LR", 
              graph_attr=graph_attr, 
              node_attr=node_attr, 
              edge_attr=edge_attr):
    with Cluster("CDC"):
        # RDS 資料來源
        rds = RDS("PostgreSQL")
        # DMS 監控 RDS
        dms = DMS("DMS")

    # Kinesis & Lambda 
    with Cluster("Streaming Processing"):
        kinesis_stream = KinesisDataStreams("Kinesis Streaming")
        kinesis_firehose = KinesisDataFirehose("Firehose Streaming")
        s3_upload_event = Eventbridge("Upoload_events")
        s3_store = S3("S3")
        lambda_func = Lambda("Lambda")
    rds >> kinesis_stream >> kinesis_firehose
    kinesis_firehose >> s3_store >> s3_upload_event >> lambda_func

# 轉換成標準json格式

In [None]:
import json
path = ""
with open(path, 'r', encoding='utf-8') as file:
    content = file.read()

# 每個 JSON 物件之間有換行符號隔開時可這樣做：
raw_jsons = content.strip().split('\n}\n{')

json_objects = []
for index, json_str in enumerate(raw_jsons):
    try:
        json_str = json.loads(json.dumps(json_str.strip()))
        json_str = json_str.replace("\n", "").replace("\t", "")
        if json_str[0] != "{":
            json_str = "{" + json_str.strip()
        if "}}" not in json_str:
            json_str += "}"
        json_data = json.loads(json_str)
        json_objects.append(json_data)
    except Exception as e:
        print(index)
# 儲存成標準 JSON 陣列格式
with open('output.json', 'w', encoding='utf-8') as f:
    json.dump(json_objects, f, ensure_ascii=False, indent=4)

