In [1]:
# Configure S3 credentials
spark.conf.set("fs.s3a.access.key", "XXX")
spark.conf.set("fs.s3a.secret.key", "XXX")
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

StatementMeta(, 9cede16d-de81-4bd9-821e-e9f1b9c1b7b4, 3, Finished, Available, Finished)

## Ingest Data from s3 

In [2]:
# Read JSON file from S3
df = spark.read.format("json") \
    .option("multiline", "true") \
    .option("inferSchema", "true") \
    .load("s3a://transactions-bucket-2024/network*/*transactions*.json")

# Display schema and sample data
df.printSchema()
df.show(5)

StatementMeta(, 9cede16d-de81-4bd9-821e-e9f1b9c1b7b4, 4, Finished, Available, Finished)

root
 |-- customer_id: long (nullable = true)
 |-- date: long (nullable = true)
 |-- id: long (nullable = true)
 |-- limit_execution: string (nullable = true)
 |-- market_execution_json: struct (nullable = true)
 |    |-- destination_amount: decimal(21,0) (nullable = true)
 |    |-- fees: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: long (nullable = true)
 |    |    |    |-- token: string (nullable = true)
 |    |-- hybrid: struct (nullable = true)
 |    |    |-- DestinationAmount: decimal(21,0) (nullable = true)
 |    |    |-- DestinationCoin: string (nullable = true)
 |    |    |-- Fees: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- amount: long (nullable = true)
 |    |    |    |    |-- token: string (nullable = true)
 |    |    |-- IsPartiallyFilledAllowed: boolean (nullable = true)
 |    |    |-- PoolIDs: string (nullable = true)
 |    |    |-- ProtocolFees: arr

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import json

class TransactionProcessor:
    def __init__(self, spark):
        self.spark = spark
        self.define_schema()
        
    def define_schema(self):
        """Định nghĩa schema cho dữ liệu đầu ra"""
        self.output_schema = StructType([
            StructField("transaction_id", LongType(), False),
            StructField("customer_id", LongType(), False),
            StructField("transaction_time", TimestampType(), False),
            StructField("source_amount", DecimalType(38, 18), True),
            StructField("destination_amount", DecimalType(38, 18), True),
            StructField("execution_plan", StringType(), True),
            StructField("pool_ids", ArrayType(LongType()), True),
            StructField("order_book_ids", ArrayType(LongType()), True),
            StructField("protocol_fee_token", StringType(), True),
            StructField("protocol_fee_amount", DecimalType(38, 18), True),
            StructField("total_amount_usd", DecimalType(38, 18), True),
            StructField("total_fees_usd", DecimalType(38, 18), True)
        ])

    def extract_pool_ids(self, df):
        """Trích xuất pool IDs từ cấu trúc JSON"""
        return when(
            col("market_execution_json.pool").isNotNull(),
            array_distinct(flatten(
                transform("market_execution_json.pool.split_swaps", 
                         lambda x: transform(x.swap_route, lambda y: y.pool_id))
            ))
        ).otherwise(array())

    def extract_order_book_ids(self, df):
        """Trích xuất order book IDs từ cấu trúc JSON"""
        return when(
            col("market_execution_json.order_book").isNotNull(),
            col("market_execution_json.order_book.filled_order_ids")
        ).otherwise(array())

    def process_transactions(self, input_path, batch_size=10000):
        """Xử lý dữ liệu giao dịch theo batches"""
        try:
            # Đọc dữ liệu JSON
            raw_df = self.spark.read.json(
                input_path,
                multiLine=True,
                mode="PERMISSIVE",
                columnNameOfCorruptRecord="_corrupt_record"
            )
            
            # Cache DataFrame gốc
            raw_df.cache()
            
            # Transform dữ liệu
            processed_df = raw_df.select(
                col("id").alias("transaction_id"),
                col("customer_id"),
                from_unixtime(col("date")).alias("transaction_time"),
                
                # Xử lý source và destination amount
                when(col("market_execution_json.source_amount").isNotNull(),
                     col("market_execution_json.source_amount").cast(DecimalType(38, 18))/pow(10, 18)
                ).otherwise(lit(0)).alias("source_amount"),
                
                when(col("market_execution_json.destination_amount").isNotNull(),
                     col("market_execution_json.destination_amount").cast(DecimalType(38, 18))/pow(10, 18)
                ).otherwise(lit(0)).alias("destination_amount"),
                
                # Xác định execution plan
                when(col("market_execution_json.pool").isNotNull(), "pool")
                .when(col("market_execution_json.order_book").isNotNull(), "order_book")
                .otherwise("unknown").alias("execution_plan"),
                
                # Trích xuất pool và order book IDs
                self.extract_pool_ids(raw_df).alias("pool_ids"),
                self.extract_order_book_ids(raw_df).alias("order_book_ids"),
                
                # Xử lý protocol fees
                col("market_execution_json.protocol_fees").getItem(0).getField("token")
                .alias("protocol_fee_token"),
                
                when(col("market_execution_json.protocol_fees").getItem(0).getField("amount").isNotNull(),
                     col("market_execution_json.protocol_fees").getItem(0).getField("amount").cast(DecimalType(38, 18))/pow(10, 18)
                ).otherwise(lit(0)).alias("protocol_fee_amount")
            )
            
            # Tính toán các trường bổ sung
            enriched_df = processed_df.withColumns({
                # Tính tổng amount (USD)
                "total_amount_usd": when(col("source_amount").isNotNull(), 
                                      col("source_amount") * lit(100)  # Giả sử tỷ giá 1:100
                                 ).otherwise(lit(0)),
                
                # Tính tổng fees (USD)
                "total_fees_usd": when(col("protocol_fee_amount").isNotNull(),
                                     col("protocol_fee_amount") * lit(100)
                                ).otherwise(lit(0))
            })
            
            # Loại bỏ dữ liệu không hợp lệ
            cleaned_df = enriched_df.filter(
                col("transaction_id").isNotNull() &
                col("customer_id").isNotNull() &
                col("transaction_time").isNotNull() &
                col("source_amount").isNotNull()
            ).dropDuplicates(["transaction_id"])
            
            # Cache kết quả cuối cùng
            cleaned_df.cache()
            
            # Invalidate cache của DataFrame gốc
            raw_df.unpersist()
            
            return cleaned_df
            
        except Exception as e:
            print(f"Error processing transactions: {str(e)}")
            raise

    def get_statistics(self, df):
        """Tạo thống kê tổng hợp"""
        return df.agg(
            count("transaction_id").alias("total_transactions"),
            countDistinct("customer_id").alias("unique_customers"),
            sum("total_amount_usd").alias("total_volume_usd"),
            sum("total_fees_usd").alias("total_fees_usd"),
            avg("source_amount").alias("avg_source_amount"),
            max("source_amount").alias("max_source_amount"),
            min("source_amount").alias("min_source_amount")
        )

    def save_processed_data(self, df, output_path):
        """Lưu dữ liệu đã xử lý"""
        df.write \
            .partitionBy("transaction_time") \
            .mode("overwrite") \
            .parquet(output_path)


StatementMeta(, 9cede16d-de81-4bd9-821e-e9f1b9c1b7b4, 7, Finished, Available, Finished)

In [6]:
# Sử dụng processor
try:
    processor = TransactionProcessor(spark)
    
    # Xử lý dữ liệu
    processed_df = processor.process_transactions("s3a://transactions-bucket-2024/network*/*transactions*.json")
    
    # Lấy thống kê tổng hợp
    stats_df = processor.get_statistics(processed_df)
    
    # Hiển thị kết quả
    print("Sample processed data:")
    processed_df.show(5, truncate=False)
    
    print("\nStatistics:")
    stats_df.show(truncate=False)
    
    # Lưu dữ liệu
    processor.save_processed_data(processed_df, "s3a://transactions-bucket-2024/result")
    
except Exception as e:
    print(f"Error: {str(e)}")

StatementMeta(, 9cede16d-de81-4bd9-821e-e9f1b9c1b7b4, 8, Finished, Available, Finished)

Sample processed data:
+--------------+-----------+-------------------+-------------+------------------+--------------+--------+--------------+------------------+-------------------+----------------+--------------+
|transaction_id|customer_id|transaction_time   |source_amount|destination_amount|execution_plan|pool_ids|order_book_ids|protocol_fee_token|protocol_fee_amount|total_amount_usd|total_fees_usd|
+--------------+-----------+-------------------+-------------+------------------+--------------+--------+--------------+------------------+-------------------+----------------+--------------+
|1110          |11100      |2023-10-28 15:30:50|3.03346811   |null              |unknown       |[]      |[]            |sol               |0.007583670275     |303.346811      |0.7583670275  |
|22200         |22201      |2023-10-28 15:31:50|1.2E-15      |3.0E-15           |pool          |[34]    |[23578333]    |null              |0.0                |1.2E-13         |0.0           |
|11100         |1