In [18]:
# | Field Name | Data Type | Description (Vietnamese) | Description (English) | Business Rules | Example |
# |------------|-----------|--------------------------|----------------------|----------------|---------|
# | transaction_key | int | Khóa chính surrogate | Primary surrogate key | PK, NOT NULL, UNIQUE | 1001 |
# | customer_key | int | Khóa khách hàng | Customer foreign key | FK to DIM_CUSTOMER | 12345 |
# | product_key | int | Khóa sản phẩm | Product foreign key | FK to DIM_PRODUCT | 567 |
# | location_key | int | Khóa địa điểm | Location foreign key | FK to DIM_LOCATION | 789 |
# | event_key | int | Khóa sự kiện | Event foreign key | FK to DIM_EVENT | 101 |
# | involved_party_key | int | Khóa bên liên quan | Involved party foreign key | FK to DIM_INVOLVED_PARTY | 234 |
# | condition_key | int | Khóa điều kiện | Condition foreign key | FK to DIM_CONDITION | 345 |
# | application_key | int | Khóa ứng dụng | Application foreign key | FK to DIM_APPLICATION | 456 |
# | asset_key | int | Khóa tài sản | Asset foreign key | FK to DIM_ASSET | 678 |
#------------------------------------------------------------------
#    KEY 
#------------------------------------------------------------------
# | transaction_id | string | Mã giao dịch duy nhất | Unique transaction ID | NOT NULL, UNIQUE | TXN20240315143025001 |
# | reference_number | string | Số tham chiếu | Reference number | Max 50 chars | REF123456789 |
# | transaction_type | string | Loại giao dịch | Transaction type | NOT NULL | Chuyển khoản, Rút tiền, Nạp tiền,  Nhận Tiền |
# | transaction_category | string | Danh mục giao dịch | Transaction category | NOT NULL | Nội bộ, Liên ngân hàng, Quốc tế |
# | transaction_subcategory | string | Danh mục phụ | Transaction subcategory | Max 100 chars | Chuyển khoản cùng ngân hàng |
# | transaction_amount | decimal | Số tiền giao dịch | Transaction amount | NOT NULL, >= 0 | 1000000 |
# | fee_amount | decimal | Phí giao dịch | Fee amount | >= 0 | 11000 |
# | tax_amount | decimal | Thuế | Tax amount | >= 0 | 1100 |
# | net_amount | decimal | Số tiền thực nhận | Net amount | NOT NULL | 987900 |
# | currency | string | Loại tiền tệ | Currency code | ISO 4217, NOT NULL | VND |
# | transaction_status | string | Trạng thái giao dịch | Transaction status | NOT NULL | Thành công, Thất bại, Đang xử lý |
# | channel | string | Kênh giao dịch | Transaction channel | NOT NULL | ATM, Mobile App, Internet Banking |
# | description | string | Mô tả giao dịch | Transaction description | Max 500 chars | Chuyển khoản cho thuê nhà |
# | created_timestamp | timestamp | Thời gian tạo | Creation timestamp | NOT NULL | 2024-03-15 14:30:25.123 |
# | processed_timestamp | timestamp | Thời gian xử lý | Processing timestamp | >= created_timestamp | 2024-03-15 14:30:27.456 |
# | updated_timestamp | timestamp | Thời gian cập nhật cuối | Last update timestamp | >= created_timestamp | 2024-03-15 14:31:00.789 |



In [None]:
from datetime import datetime, timedelta
from faker import Faker
import pandas as pd
import random
import uuid
import psycopg2
from sqlalchemy import create_engine
import os 
from dotenv import load_dotenv

class getInfoTransaction:
    def __init__(self, conn_params):
        self.engine = create_engine(
            f"postgresql+psycopg2://{conn_params['user']}:{conn_params['password']}@{conn_params['host']}:{conn_params['port']}/{conn_params['dbname']}"
        )

    def _query(self, sql: str) -> pd.DataFrame:
        try:
            conn = self.engine.raw_connection()   
            try:
                df = pd.read_sql_query(sql, conn)
            finally:
                conn.close()  
            return df
        except Exception as e:  
            print(f"Error executing query: {e}")
            return pd.DataFrame()

    def getCustomer(self):
        df = self._query("SELECT customer_key FROM banking_dw.dim_customer")
        return df

    def getLocation(self):
        df = self._query("SELECT location_key   FROM banking_dw.dim_location")
        return df

    def getApplication(self):
        df = self._query("SELECT application_key FROM banking_dw.dim_application")
        return df

    def getAccount(self):
        df = self._query("SELECT account_key , account_number , customer_key , current_balance FROM banking_dw.dim_account")
        return df

class GenerationTranaction :
    def __init__(self , conn_params):
        self.conn_params = conn_params
        self.account = getInfoTransaction(conn_params).getAccount()
        self.location = getInfoTransaction(conn_params).getLocation()
        self.application = getInfoTransaction(conn_params).getApplication()
        self.transaction_types = ["Chuyển khoản", "Rút tiền", "Nạp tiền", "Nhận Tiền"]
        self.transaction_category = ["Nội bộ", "Liên ngân hàng", "Quốc tế"]
        self.transaction_status = ["Thành công", "Thất bại"]
        self.channel = ["ATM", "Mobile App", "Internet Banking"]
        
    def get_customer_key(self , account_key):
        df = self.account[self.account["account_key"] == account_key]
        if not df.empty:
            return df.iloc[0]["customer_key"]
        return None
    
    # tinh tien net nhan duoc khi thuc hien giao dich
    def amount(self ,transaction_types , transaction_status,  transaction_amount , fee_amount , tax_amount , ):
        if transaction_status == "Thất bại":
            return 0.0
        else:
            if transaction_types == "Rút tiền":
                return round(transaction_amount + fee_amount + tax_amount , 2)
            elif transaction_types == "Nạp tiền":
                return round(transaction_amount)
            elif transaction_types == "Chuyển khoản":  
                return  round(transaction_amount + fee_amount + tax_amount , 2)
            elif transaction_types == "Nhận Tiền":
                return round(transaction_amount)
        
        
    def transaction_account_number(self , transaction_category ):
        if transaction_category == "Nội bộ":
            return random.choice(self.account["account_number"])
        elif transaction_category == "Liên ngân hàng":
            return f"LB{random.randint(100000, 999999)}"
        elif transaction_category == "Quốc tế":
            return f"QT{random.randint(100000, 999999)}"
        
    def generator_data_transaction(self):
        
        account_key   =  int(random.choice(self.account["account_key"]))
        customer_key  =  int(self.get_customer_key(account_key))
        location_key  =  int(random.choice(self.location["location_key"]))
        event_key = uuid.uuid4().int >> 64
        application_key = int(random.choice(self.account["account_key"]))
        transaction_id = f"TXN{datetime.now().strftime('%Y%m%d%H%M%S')}{random.randint(100, 999)}"
        reference_number = f"REF{uuid.uuid4().hex[:10].upper()}"
        transaction_type = random.choice(self.transaction_types)
        transaction_category = random.choice(self.transaction_category)
        transaction_amount = round(random.uniform(10000, 10000000), 2)
        transaction_status = random.choice(self.transaction_status)
        fee_amount = round(transaction_amount * random.uniform(0.001, 0.01), 2)
        tax_amount = round(fee_amount * 0.1, 2)
        net_amount = self.amount(transaction_type,transaction_status,transaction_amount, fee_amount, tax_amount) 
        currency = "VND"
        account_number = self.transaction_account_number(transaction_category)

        channel = random.choice(self.channel)
        description = f"Giao dịch {transaction_type} qua {channel}"
        created_timestamp = datetime.now()
        processed_timestamp = created_timestamp + timedelta(seconds=random.randint(1, 300))
        updated_timestamp = processed_timestamp + timedelta(seconds=random.randint(1, 300))

        
        return {
            "account_key": account_key,
            "customer_key": customer_key,
            "location_key": location_key,
            "event_key": event_key,
            "application_key": application_key,
            "transaction_id": transaction_id,
            "reference_number": reference_number,
            "transaction_type": transaction_type,
            "transaction_category": transaction_category,
            "transaction_amount": transaction_amount,
            "fee_amount": fee_amount,
            "tax_amount": tax_amount,
            "net_amount": net_amount,
            "currency": currency,
            "account_number": account_number,
            "transaction_status": transaction_status,
            "channel": channel,
            "description": description,
            "created_timestamp": created_timestamp,
            "processed_timestamp": processed_timestamp,
            "updated_timestamp": updated_timestamp
        }
          
if __name__ == "__main__":  
    load_dotenv()
    
    conn_params = {
        "host": os.getenv("DB_HOST"),
        "port": os.getenv("DB_PORT"),
        "dbname": os.getenv("DB_NAME"),
        "user": os.getenv("DB_USER"),
        "password": os.getenv("DB_PASS")
    }
    info_transaction = GenerationTranaction(conn_params)
    print(info_transaction.generator_data_transaction())

  df = pd.read_sql_query(sql, conn)


{'account_key': 15, 'customer_key': 7, 'location_key': 5, 'event_key': 16635940198104253731, 'application_key': 16, 'transaction_id': 'TXN20250904171810663', 'reference_number': 'REF944C96BAD0', 'transaction_type': 'Rút tiền', 'transaction_category': 'Nội bộ', 'transaction_amount': 6269738.14, 'fee_amount': 31681.51, 'tax_amount': 3168.15, 'net_amount': 0.0, 'currency': 'VND', 'account_number': '263bd9d371194', 'transaction_status': 'Thất bại', 'channel': 'ATM', 'description': 'Giao dịch Rút tiền qua ATM', 'created_timestamp': datetime.datetime(2025, 9, 4, 17, 18, 10, 873244), 'processed_timestamp': datetime.datetime(2025, 9, 4, 17, 19, 26, 873244), 'updated_timestamp': datetime.datetime(2025, 9, 4, 17, 20, 23, 873244)}
{'account_key': 40, 'customer_key': 19, 'location_key': 5, 'event_key': 13161864128149406645, 'application_key': 14, 'transaction_id': 'TXN20250904171811805', 'reference_number': 'REF0275686E17', 'transaction_type': 'Nạp tiền', 'transaction_category': 'Quốc tế', 'transa

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dotenv import load_dotenv
import os
import logging
import time
import traceback

# ===============================
# Logging setup
# ===============================
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load environment variables (.env đặt ngoài project/consumer)
load_dotenv(dotenv_path=os.path.join("/home/hadoop/project", ".env"))

# ===============================
# Config
# ===============================
KAFKA_CONFIG = {
    'bootstrap.servers': '192.168.235.136:9092,192.168.235.147:9092,192.168.235.148:9092',
}

POSTGRES_CONFIG = {
    "url": f"jdbc:postgresql://{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}",
    "table": "banking_dw.fact_transaction",
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASS"),
    "driver": "org.postgresql.Driver"
}

logger.info(f"🔧 Database: {POSTGRES_CONFIG['url']}")
logger.info(f"🔧 Table: {POSTGRES_CONFIG['table']}")

# ===============================
# Schema khớp với bảng fact_transaction
# ===============================
TRANSACTION_SCHEMA = StructType([
    StructField("account_key", IntegerType(), True),
    StructField("customer_key", IntegerType(), True),
    StructField("location_key", IntegerType(), True),
    StructField("event_key", IntegerType(), True),
    StructField("application_key", IntegerType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("reference_number", StringType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("transaction_category", StringType(), True),
    StructField("transaction_amount", DoubleType(), True),
    StructField("transaction_status", StringType(), True),
    StructField("fee_amount", DoubleType(), True),
    StructField("tax_amount", DoubleType(), True),
    StructField("net_amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("account_number", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("description", StringType(), True),
    StructField("created_timestamp", TimestampType(), True),
    StructField("processed_timestamp", TimestampType(), True),
    StructField("updated_timestamp", TimestampType(), True)
])

# ===============================
# Realtime Write Function
# ===============================
def write_realtime_batch(batch_df, batch_id):
    start_time = time.time()
    try:
        row_count = batch_df.count()
        logger.info(f"⚡ Batch {batch_id}: Processing {row_count} records")

        if row_count == 0:
            return

        # Validate: transaction_id + transaction_amount phải có
        valid_df = batch_df.filter(
            col("transaction_id").isNotNull() &
            col("transaction_amount").isNotNull() &
            (col("transaction_amount") > 0)
        )

        valid_count = valid_df.count()
        if valid_count == 0:
            logger.warning(f"⚡ Batch {batch_id}: No valid records")
            return

        # Add processing timestamp
        final_df = valid_df.withColumn("realtime_processed_at", current_timestamp())

        # Write to PostgreSQL
        final_df.write \
            .format("jdbc") \
            .option("url", POSTGRES_CONFIG["url"]) \
            .option("dbtable", POSTGRES_CONFIG["table"]) \
            .option("user", POSTGRES_CONFIG["user"]) \
            .option("password", POSTGRES_CONFIG["password"]) \
            .option("driver", POSTGRES_CONFIG["driver"]) \
            .option("batchsize", "500") \
            .option("isolationLevel", "READ_UNCOMMITTED") \
            .option("numPartitions", "1") \
            .option("rewriteBatchedStatements", "true") \
            .mode("append") \
            .save()

        processing_time = time.time() - start_time
        logger.info(f"✅ Batch {batch_id}: Inserted {valid_count} records in {processing_time:.2f}s")

    except Exception as e:
        processing_time = time.time() - start_time
        logger.error(f"❌ Batch {batch_id} failed after {processing_time:.2f}s: {e}")
        logger.error(f"❌ Error details: {traceback.format_exc()}")

# ===============================
# Realtime Streaming Class
# ===============================
class RealtimeSparkStreaming:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("RealtimeKafkaToPostgres") \
            .master("local[*]") \
            .config("spark.executor.memory", "2g") \
            .config("spark.executor.cores", "2") \
            .config("spark.driver.memory", "1g") \
            .config("spark.sql.shuffle.partitions", "2") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
            .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,org.postgresql:postgresql:42.2.18") \
            .getOrCreate()

        self.spark.sparkContext.setLogLevel("ERROR")
        logger.info("⚡ Spark session created for realtime processing")

    def create_realtime_stream(self):
        logger.info("⚡ Creating realtime Kafka stream...")

        kafka_df = self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_CONFIG['bootstrap.servers']) \
            .option("subscribe", "transaction_data") \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .option("maxOffsetsPerTrigger", "1000") \
            .option("kafka.consumer.cache.enabled", "false") \
            .load()

        parsed_df = kafka_df.select(
            col("key").cast("string"),
            from_json(col("value").cast("string"), TRANSACTION_SCHEMA).alias("data"),
            col("timestamp").alias("kafka_timestamp")
        ).select("key", "data.*", "kafka_timestamp")

        parsed_df.printSchema()
        logger.info("✅ Realtime Kafka stream created successfully")
        return parsed_df

    def start_realtime_processing(self):
        df = self.create_realtime_stream()

        query = df.writeStream \
            .foreachBatch(write_realtime_batch) \
            .outputMode("append") \
            .option("checkpointLocation", "/tmp/realtime_checkpoint") \
            .trigger(processingTime="2 seconds") \
            .start()

        logger.info("⚡ REALTIME PROCESSING STARTED!")
        query.awaitTermination()

# ===============================
# Main Function
# ===============================
def main():
    logger.info("🚀 STARTING REALTIME KAFKA TO POSTGRES STREAMING")
    realtime_stream = RealtimeSparkStreaming()
    realtime_stream.start_realtime_processing()

if __name__ == "__main__":
    main()


2025-09-08 22:17:19,778 - __main__ - INFO - 🔧 Database: jdbc:postgresql://192.168.235.136:5432/dwh
2025-09-08 22:17:19,778 - __main__ - INFO - 🔧 Table: banking_dw.fact_transaction
2025-09-08 22:17:19,779 - __main__ - INFO - 🚀 STARTING REALTIME KAFKA TO POSTGRES STREAMING


TypeError: 'JavaPackage' object is not callable