In [None]:
import pandas as pd
from minio import Minio
from fastavro import reader
from io import BytesIO

# Thông tin kết nối MinIO
MINIO_ENDPOINT = "minio.ducdh.com"
MINIO_ACCESS_KEY = "minio"
MINIO_SECRET_KEY = "minio123"
BUCKET = "stream-bucket"

def read_all_avro_in_topic(bucket, topic, minio_client):
    """
    Đọc toàn bộ file avro trong các partition của một topic trên MinIO thành 1 DataFrame.
    Path: stream-bucket/topics/<topic-name>/partition=*/<topic-name>+*+*.avro
    """
    prefix = f"topics/{topic}/partition="
    # Duyệt qua tất cả object có prefix trên (recursive=True lấy hết file trong subfolder)
    objects = minio_client.list_objects(bucket, prefix=f"topics/{topic}/", recursive=True)
    dfs = []
    for obj in objects:
        if obj.object_name.endswith('.avro') and f"/partition=" in obj.object_name:
            # Lọc đúng pattern topic/partition=*/file.avro
            data = minio_client.get_object(bucket, obj.object_name).read()
            with BytesIO(data) as bio:
                records = list(reader(bio))
                if records:
                    dfs.append(pd.DataFrame(records))
    if dfs:
        return pd.concat(dfs, ignore_index=True)
    else:
        return pd.DataFrame()

def main():
    minio_client = Minio(
        MINIO_ENDPOINT,
        access_key=MINIO_ACCESS_KEY,
        secret_key=MINIO_SECRET_KEY,
        secure=False
    )

    # Đọc toàn bộ file Avro cho từng topic
    print("Đang đọc transaction-topic...")
    df_transaction = read_all_avro_in_topic(BUCKET, "transaction-topic", minio_client)
    print("Đang đọc user-topic...")
    df_user = read_all_avro_in_topic(BUCKET, "user-topic", minio_client)
    print("Đang đọc card-topic...")
    df_card = read_all_avro_in_topic(BUCKET, "card-topic", minio_client)

    # Kiểm tra dữ liệu đầu vào
    print("Transaction:", df_transaction.shape)
    print("User:", df_user.shape)
    print("Card:", df_card.shape)

    # Join
    df_join = df_transaction.merge(df_card, left_on="user", right_on="user", how="left")
    df_join = df_join.merge(df_user, left_on="user", right_on="idx", how="left")

    # Lọc và groupby
    result = (
        df_join[df_join['is_fraud'] == 'Yes']
        .groupby('user')
        .agg(
            count_fraud=('user', 'size'),
            person=('person', 'first'),
            current_age=('current_age', 'first'),
            total_debt=('total_debt', 'first'),
            card_number=('card_number', 'first'),
            credit_limit=('credit_limit', 'first'),
        )
        .reset_index()
    )

    # Xuất ra file csv
    result.to_csv('fraud_report.csv', index=False)
    print("Đã xuất kết quả ra fraud_report.csv")

if __name__ == "__main__":
    main()


Đang đọc transaction-topic...
