## Initialize Spark Session

In [120]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.sql("SET spark.sql.ansi.enabled = false")

DataFrame[key: string, value: string]

## Read CSV

In [20]:
transactions_df = spark.read.csv(
    './data/transactions_20659158_20659258.csv',
    header=True,       # L·∫•y d√≤ng ƒë·∫ßu ti√™n l√†m t√™n c·ªôt
    inferSchema=True   # T·ª± ƒë·ªông suy lu·∫≠n ki·ªÉu d·ªØ li·ªáu
)

# Ki·ªÉm tra l·∫°i t√™n v√† ki·ªÉu d·ªØ li·ªáu
print("Transaction Schema...")
transactions_df.printSchema() 

# Hi·ªÉn th·ªã l·∫°i 1 d√≤ng ƒë·∫ßu ti√™n
print("Transaction Record Display...")
transactions_df.show(1, truncate=False, vertical=True)

Transaction Schema...
root
 |-- hash: string (nullable = true)
 |-- nonce: integer (nullable = true)
 |-- block_hash: string (nullable = true)
 |-- block_number: integer (nullable = true)
 |-- transaction_index: integer (nullable = true)
 |-- from_address: string (nullable = true)
 |-- to_address: string (nullable = true)
 |-- value: decimal(21,0) (nullable = true)
 |-- gas: integer (nullable = true)
 |-- gas_price: long (nullable = true)
 |-- input: string (nullable = true)
 |-- block_timestamp: integer (nullable = true)
 |-- max_fee_per_gas: long (nullable = true)
 |-- max_priority_fee_per_gas: long (nullable = true)
 |-- transaction_type: integer (nullable = true)
 |-- max_fee_per_blob_gas: long (nullable = true)
 |-- blob_versioned_hashes: string (nullable = true)

Transaction Record Display...
-RECORD 0--------------------------------------------------------------------------------------
 hash                     | 0x33f773891e244c0baa88e9a5498f15d10b6c5cc02976369c272b210eeaffa84d

In [78]:
tokens_df = spark.read.csv(
    './data/tokens_20659158_20659258.csv',
    header=True,       # L·∫•y d√≤ng ƒë·∫ßu ti√™n l√†m t√™n c·ªôt
    inferSchema=True   # T·ª± ƒë·ªông suy lu·∫≠n ki·ªÉu d·ªØ li·ªáu
)

# Ki·ªÉm tra l·∫°i t√™n v√† ki·ªÉu d·ªØ li·ªáu
print("Token Schema...")
tokens_df.printSchema() 

# Hi·ªÉn th·ªã l·∫°i 1 d√≤ng ƒë·∫ßu ti√™n
print("Token Record Display...")
tokens_df.show(1, truncate=False, vertical=True)

Token Schema...
root
 |-- address: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- decimals: integer (nullable = true)
 |-- total_supply: double (nullable = true)
 |-- block_number: string (nullable = true)

Token Record Display...
-RECORD 0--------------------------------------------------
 address      | 0x000000000002c0b7dfdf4e6a7d53aeb8889b80bf 
 symbol       | XTREME                                     
 name         | Xtremeverse                                
 decimals     | NULL                                       
 total_supply | 1803.0                                     
 block_number | NULL                                       
only showing top 1 row


In [69]:
token_transfers_df = spark.read.csv(
    './data/token_transfers_20659158_20659258.csv',
    header=True,       # L·∫•y d√≤ng ƒë·∫ßu ti√™n l√†m t√™n c·ªôt
    inferSchema=True   # T·ª± ƒë·ªông suy lu·∫≠n ki·ªÉu d·ªØ li·ªáu
)

# Ki·ªÉm tra l·∫°i t√™n v√† ki·ªÉu d·ªØ li·ªáu
print("Token Transfers Schema...")
token_transfers_df.printSchema() 

# Hi·ªÉn th·ªã l·∫°i 1 d√≤ng ƒë·∫ßu ti√™n
print("Token Transfers Display...")
token_transfers_df.show(1, truncate=False, vertical=True)

Token Transfers Schema...
root
 |-- token_address: string (nullable = true)
 |-- from_address: string (nullable = true)
 |-- to_address: string (nullable = true)
 |-- value: double (nullable = true)
 |-- transaction_hash: string (nullable = true)
 |-- log_index: integer (nullable = true)
 |-- block_number: integer (nullable = true)

Token Transfers Display...
-RECORD 0------------------------------------------------------------------------------
 token_address    | 0xa3d58c4e56fedcae3a7c43a725aee9a71f0ece4e                         
 from_address     | 0x0000000000000000000000000000000000000000                         
 to_address       | 0x1105e8d2bb5a7a6d37462a82c81986e931dfd34e                         
 value            | 2.8815E21                                                          
 transaction_hash | 0x33f773891e244c0baa88e9a5498f15d10b6c5cc02976369c272b210eeaffa84d 
 log_index        | 4                                                                  
 block_number     | 20

In [76]:
logs_df = spark.read.csv(
    './data/logs_20659158_20659258.csv',
    header=True,       # L·∫•y d√≤ng ƒë·∫ßu ti√™n l√†m t√™n c·ªôt
    inferSchema=True   # T·ª± ƒë·ªông suy lu·∫≠n ki·ªÉu d·ªØ li·ªáu
)

# Ki·ªÉm tra l·∫°i t√™n v√† ki·ªÉu d·ªØ li·ªáu
print("Logs Schema...")
logs_df.printSchema() 

# Hi·ªÉn th·ªã l·∫°i 1 d√≤ng ƒë·∫ßu ti√™n
print("Logs Display...")
logs_df.show(1, truncate=True, vertical=True)

Logs Schema...
root
 |-- log_index: integer (nullable = true)
 |-- transaction_hash: string (nullable = true)
 |-- transaction_index: integer (nullable = true)
 |-- block_hash: string (nullable = true)
 |-- block_number: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- data: string (nullable = true)
 |-- topics: string (nullable = true)

Logs Display...
-RECORD 0---------------------------------
 log_index         | 0                    
 transaction_hash  | 0x33f773891e244c0... 
 transaction_index | 0                    
 block_hash        | 0x8899ffe1704a0f5... 
 block_number      | 20659158             
 address           | 0x686e5ac50d9236a... 
 data              | 0x000000000000000... 
 topics            | 0x7d6f066517cd225... 
only showing top 1 row


In [77]:
receipts_df = spark.read.csv(
    './data/receipts_20659158_20659258.csv',
    header=True,       # L·∫•y d√≤ng ƒë·∫ßu ti√™n l√†m t√™n c·ªôt
    inferSchema=True   # T·ª± ƒë·ªông suy lu·∫≠n ki·ªÉu d·ªØ li·ªáu
)

# Ki·ªÉm tra l·∫°i t√™n v√† ki·ªÉu d·ªØ li·ªáu
receipts_df.printSchema() 

# Hi·ªÉn th·ªã l·∫°i 1 d√≤ng ƒë·∫ßu ti√™n
receipts_df.show(1, truncate=False, vertical=True)

root
 |-- transaction_hash: string (nullable = true)
 |-- transaction_index: integer (nullable = true)
 |-- block_hash: string (nullable = true)
 |-- block_number: integer (nullable = true)
 |-- cumulative_gas_used: integer (nullable = true)
 |-- gas_used: integer (nullable = true)
 |-- contract_address: string (nullable = true)
 |-- root: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- effective_gas_price: long (nullable = true)
 |-- l1_fee: string (nullable = true)
 |-- l1_gas_used: string (nullable = true)
 |-- l1_gas_price: string (nullable = true)
 |-- l1_fee_scalar: string (nullable = true)
 |-- blob_gas_price: integer (nullable = true)
 |-- blob_gas_used: integer (nullable = true)

-RECORD 0---------------------------------------------------------------------------------
 transaction_hash    | 0x33f773891e244c0baa88e9a5498f15d10b6c5cc02976369c272b210eeaffa84d 
 transaction_index   | 0                                                                  
 block_h

In [87]:
contracts_df = spark.read.csv(
    './data/contracts_20659158_20659258.csv',
    header=True,       # L·∫•y d√≤ng ƒë·∫ßu ti√™n l√†m t√™n c·ªôt
    inferSchema=True   # T·ª± ƒë·ªông suy lu·∫≠n ki·ªÉu d·ªØ li·ªáu
)

# Ki·ªÉm tra l·∫°i t√™n v√† ki·ªÉu d·ªØ li·ªáu
contracts_df.printSchema() 

# Hi·ªÉn th·ªã l·∫°i 1 d√≤ng ƒë·∫ßu ti√™n
contracts_df.show(1, truncate=True, vertical=True)

root
 |-- address: string (nullable = true)
 |-- bytecode: string (nullable = true)
 |-- function_sighashes: string (nullable = true)
 |-- is_erc20: boolean (nullable = true)
 |-- is_erc721: boolean (nullable = true)
 |-- block_number: string (nullable = true)

-RECORD 0----------------------------------
 address            | 0x5d2ed45d91944fe... 
 bytecode           | 0x608060405234801... 
 function_sighashes | NULL                 
 is_erc20           | false                
 is_erc721          | false                
 block_number       | NULL                 
only showing top 1 row


## Define Output Schema

In [47]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DecimalType, TimestampType, IntegerType

# 1. SCHEMA: FACT_NATIVE_TRANSACTIONS
# D√πng cho c√°c giao d·ªãch chuy·ªÉn ETH v√† th√¥ng tin k·ªπ thu·∫≠t c·ªßa Transaction
native_tx_schema = StructType([
    # Kh√≥a ch√≠nh v√† th√¥ng tin kh·ªëi
    StructField("tx_hash", StringType(), False),     # M√£ bƒÉm giao d·ªãch (Primary Key)
    StructField("block_number", LongType(), False),  # S·ªë kh·ªëi
    StructField("block_timestamp", TimestampType(), False), # Th·ªùi gian th·ª±c hi·ªán (Timestamp type)
    StructField("block_hash", StringType(), True),   # M√£ bƒÉm c·ªßa kh·ªëi
    
    # Th√¥ng tin ng∆∞·ªùi g·ª≠i/nh·∫≠n v√† gi√° tr·ªã
    StructField("sender_address", StringType(), True),    # V√≠ ng∆∞·ªùi g·ª≠i
    StructField("receiver_address", StringType(), True),  # V√≠ ng∆∞·ªùi nh·∫≠n
    StructField("value_eth", DecimalType(38, 18), True),  # S·ªë l∆∞·ª£ng ETH chuy·ªÉn ƒëi (ƒê∆°n v·ªã: Ether)

    # Th√¥ng tin v·ªÅ ph√≠ v√† tr·∫°ng th√°i k·ªπ thu·∫≠t
    StructField("tx_status", IntegerType(), True),    # 1 = Success, 0 = Fail
    StructField("tx_type", IntegerType(), True),      # Lo·∫°i giao d·ªãch
    StructField("gas_price", LongType(), True),       # ƒê∆°n gi√° Gas (Wei)
    StructField("gas_used", LongType(), True),        # L∆∞·ª£ng Gas ti√™u th·ª• th·ª±c t·∫ø
    StructField("tx_fee_eth", DecimalType(38, 18), True), # T·ªïng ph√≠ transaction (Ether)

    # Metadata b·ªï sung
    StructField("tx_input_method_id", StringType(), True), # 8 k√Ω t·ª± ƒë·∫ßu c·ªßa input data
    StructField("tx_nonce", LongType(), True)             # S·ªë th·ª© t·ª± giao d·ªãch c·ªßa v√≠ g·ª≠i
])

# 2. SCHEMA: FACT_TOKEN_TRANSFERS
# D√πng cho c√°c giao d·ªãch chuy·ªÉn Token (ERC20) v√† NFT (ERC721)
token_transfer_schema = StructType([
    # Kh√≥a ngo·∫°i li√™n k·∫øt v·ªõi b·∫£ng Native
    StructField("tx_hash", StringType(), False),  # Link t·ªõi b·∫£ng Native Transaction
    StructField("log_index", LongType(), False),  # Th·ª© t·ª± s·ª± ki·ªán trong log (ƒë·ªÉ ph√¢n bi·ªát trong 1 tx)

    # Th√¥ng tin th·ªùi gian (Duplicate t·ª´ Native ƒë·ªÉ ti·ªán truy v·∫•n nhanh m√† kh√¥ng c·∫ßn join)
    StructField("block_number", LongType(), False),
    StructField("block_timestamp", TimestampType(), False),

    # Th√¥ng tin Token
    StructField("token_address", StringType(), True),  # ƒê·ªãa ch·ªâ Smart Contract c·ªßa Token
    StructField("token_symbol", StringType(), True),  # K√Ω hi·ªáu (VD: USDT, SHIB) - L·∫•y t·ª´ b·∫£ng Dim Token
    StructField("token_type", StringType(), True),  # 'ERC20', 'ERC721', 'OTHER'

    # Th√¥ng tin chuy·ªÉn nh·∫≠n
    StructField("sender_address", StringType(), True),  # Ng∆∞·ªùi chuy·ªÉn token
    StructField("receiver_address", StringType(), True),  # Ng∆∞·ªùi nh·∫≠n token

    # Gi√° tr·ªã
    StructField("amount_raw", DecimalType(38, 0), True),  # Gi√° tr·ªã g·ªëc tr√™n Blockchain (VD: 1000000)
    StructField("token_decimals", IntegerType(), True),  # S·ªë th·∫≠p ph√¢n (VD: 6)
    StructField("amount_display", DecimalType(38, 18), True)  # Gi√° tr·ªã th·ª±c t·∫ø (VD: 1.0) = Raw / 10^Decimals
])

## Processing ETH Native Transactions

In [48]:
from pyspark.sql.functions import col

# 1. X√°c ƒë·ªãnh c√°c c·ªôt c·∫ßn lo·∫°i b·ªè t·ª´ transactions_df (tx)
cols_to_drop_from_tx = [
    "transaction_index", 
    "max_fee_per_gas", 
    "max_priority_fee_per_gas", 
    "max_fee_per_blob_gas",
    "blob_versioned_hashes",
    "blob_versioned_hashes"
]

# 2. X√¢y d·ª±ng danh s√°ch c√°c c·ªôt C·∫¶N GI·ªÆ L·∫†I t·ª´ tx
tx_all_cols = [
    col(f"tx.{c}") for c in transactions_df.columns if c not in cols_to_drop_from_tx
]

# 3. Th√™m c√°c c·ªôt t·ª´ receipts_df (rcpt)
rcpt_cols = [
    col("rcpt.status").alias("tx_status"),
    col("rcpt.gas_used"),
]

# 4. Th·ª±c hi·ªán Join v√† Select v·ªõi danh s√°ch ƒë√£ x√¢y d·ª±ng
tx_receipt_df = transactions_df.alias("tx").join(
    receipts_df.alias("rcpt"),
    on=col("tx.hash") == col("rcpt.transaction_hash"), 
    how='inner'
).select(
    *tx_all_cols, # M·ªü r·ªông danh s√°ch c√°c c·ªôt t·ª´ tx
    *rcpt_cols   # M·ªü r·ªông danh s√°ch c√°c c·ªôt t·ª´ rcpt
)

# 5. ƒê·ªïi t√™n c√°c c·ªôt
native_tx_df = tx_receipt_df \
    .withColumnRenamed("hash", "tx_hash") \
    .withColumnRenamed("nonce", "tx_nonce") \
    .withColumnRenamed("from_address", "sender_address") \
    .withColumnRenamed("to_address", "receiver_address") \
    .withColumnRenamed("value", "value_wei") \
    .withColumnRenamed("gas_price", "gas_price_wei") \
    .withColumnRenamed("gas", "gas_limit") \
    .withColumnRenamed("transaction_type", "tx_type") \
    .withColumnRenamed("input", "tx_input") \
    .withColumnRenamed("block_timestamp", "block_timestamp_unix")

# 6. S·∫Øp x·∫øp l·∫°i th·ª© t·ª± c√°c c·ªôt
native_tx_column_order = [
    # 1. Kh√≥a ch√≠nh
    "tx_hash", 
    
    # 2. Th√¥ng tin kh·ªëi
    "block_number",
    "block_timestamp_unix",
    "block_hash",
    
    # 3. Th√¥ng tin v√≠
    "sender_address",
    "receiver_address",
    
    # 4. Th√¥ng tin Gas v√† Status
    "tx_status",
    "tx_type",
    "gas_limit",
    "gas_used", # Gas Used (th·ª±c t·∫ø) n√™n n·∫±m tr∆∞·ªõc Gas Price/Value
    "gas_price_wei",
    
    # 5. Gi√° tr·ªã
    "value_wei", 
    
    # 6. Metadata kh√°c
    "tx_nonce",
    "tx_input"
]

# 7. S·∫Øp x·∫øp l·∫°i c·ªôt b·∫±ng .select()
native_tx_df = native_tx_df.select(native_tx_column_order)

# 8. Hi·ªÉn th·ªã k·∫øt qu·∫£ v·ªõi th·ª© t·ª± m·ªõi
native_tx_df.show(n=1, truncate=True, vertical=True)

-RECORD 0------------------------------------
 tx_hash              | 0x33f773891e244c0... 
 block_number         | 20659158             
 block_timestamp_unix | 1725235211           
 block_hash           | 0x8899ffe1704a0f5... 
 sender_address       | 0x7c8b9874f7be10b... 
 receiver_address     | 0x1105e8d2bb5a7a6... 
 tx_status            | 1                    
 tx_type              | 2                    
 gas_limit            | 349980               
 gas_used             | 279984               
 gas_price_wei        | 368431329852         
 value_wei            | 377684438            
 tx_nonce             | 61                   
 tx_input             | 0x016f90f6bd4d8560   
only showing top 1 row


In [60]:
from pyspark.sql.functions import col, to_timestamp, lit, substring, regexp_replace
from pyspark.sql.types import DecimalType, LongType, StringType, IntegerType

# S·ª≠ d·ª•ng l·∫°i ki·ªÉu Decimal an to√†n trong gi·ªõi h·∫°n c·ªßa Spark
DECIMAL_TYPE = DecimalType(38, 18)
# H·∫±ng s·ªë: 1 Ether = 10^18 Wei
WEI_TO_ETHER = 1000000000000000000.0 # D√πng s·ªë float ƒë·ªÉ ƒë·∫£m b·∫£o ph√©p chia float

fact_native_transactions_df = native_tx_df.select(
    # 1. Kh√≥a ch√≠nh v√† th√¥ng tin kh·ªëi
    col("tx_hash"),
    col("block_number").cast(LongType()),
    to_timestamp(col("block_timestamp_unix")).alias("block_timestamp"), # CHUY·ªÇN ƒê·ªîI: Integer (Unix) -> Timestamp
    col("block_hash"),
    
    # 2. Th√¥ng tin v√≠
    col("sender_address"),
    col("receiver_address"),
    
    # 3. Gi√° tr·ªã (T√çNH TO√ÅN: value_wei -> value_eth)
    (col("value_wei") / lit(WEI_TO_ETHER)).alias("value_eth").cast(DECIMAL_TYPE),    
    
    # 4. Th√¥ng tin Gas v√† Ph√≠
    col("tx_status").cast(IntegerType()),
    col("tx_type").cast(IntegerType()),
    col("gas_price_wei").alias("gas_price").cast(LongType()), # ƒê·ªïi t√™n v√† Cast
    col("gas_used").cast(LongType()),
    
    # T√çNH TO√ÅN tx_fee_eth
    ((col("gas_used") * col("gas_price_wei")) / lit(WEI_TO_ETHER))
        .alias("tx_fee_eth").cast(DECIMAL_TYPE), 
    
    # 5. Metadata b·ªï sung
    # T√çNH TO√ÅN tx_input_method_id: L·∫•y 8 k√Ω t·ª± hex ƒë·∫ßu ti√™n (b·ªè '0x')
    substring(regexp_replace(col("tx_input"), "0x", ""), 1, 8).alias("tx_input_method_id"),
    col("tx_nonce").cast(LongType()),
)

# Ki·ªÉm tra schema cu·ªëi c√πng
fact_native_transactions_df.printSchema()
fact_native_transactions_df.show(n=3, vertical=True, truncate=False)

root
 |-- tx_hash: string (nullable = true)
 |-- block_number: long (nullable = true)
 |-- block_timestamp: timestamp (nullable = true)
 |-- block_hash: string (nullable = true)
 |-- sender_address: string (nullable = true)
 |-- receiver_address: string (nullable = true)
 |-- value_eth: decimal(38,18) (nullable = true)
 |-- tx_status: integer (nullable = true)
 |-- tx_type: integer (nullable = true)
 |-- gas_price: long (nullable = true)
 |-- gas_used: long (nullable = true)
 |-- tx_fee_eth: decimal(38,18) (nullable = true)
 |-- tx_input_method_id: string (nullable = true)
 |-- tx_nonce: long (nullable = true)

-RECORD 0--------------------------------------------------------------------------------
 tx_hash            | 0x33f773891e244c0baa88e9a5498f15d10b6c5cc02976369c272b210eeaffa84d 
 block_number       | 20659158                                                           
 block_timestamp    | 2024-09-02 07:00:11                                                
 block_hash         |

## Handling Missing Data

In [131]:
tokens_df.describe().show(truncate=False)

+-------+------------------------------------------+--------+----------+------------------+---------------------+------------+
|summary|address                                   |symbol  |name      |decimals          |total_supply         |block_number|
+-------+------------------------------------------+--------+----------+------------------+---------------------+------------+
|count  |1145                                      |1135    |1136      |1036              |1126                 |0           |
|mean   |NULL                                      |Infinity|NULL      |15.443050193050192|8.88867204986648E37  |NULL        |
|stddev |NULL                                      |NaN     |NULL      |4.522796787824062 |2.9800984566230816E39|NULL        |
|min    |0x000000000002c0b7dfdf4e6a7d53aeb8889b80bf|       |         |0                 |1.0                  |NULL        |
|max    |0xffff894cb5c3d63ac2e021c68b21bab29fc1d154|üò∫      |üê∏MAGAPEPE|18                |1.000000010001E4

In [144]:
# Count null token decimals
null_count_decimals = tokens_df.select(
    sum(
        when(col("decimals").isNull(), 1).otherwise(0)
    ).alias("null_count_decimals")
)

print("Number of null record...")
null_count_decimals.show()


# Count null token symbols
null_count_symbols = tokens_df.select(
    sum(
        when(col("symbol").isNull(), 1).otherwise(0)
    ).alias("null_count_symbols")
)

null_count_symbols.show()

# Count null token names
null_count_names = tokens_df.select(
    sum(
        when(col("name").isNull(), 1).otherwise(0)
    ).alias("null_count_names")
)

null_count_names.show()

Number of null record...
+-------------------+
|null_count_decimals|
+-------------------+
|                109|
+-------------------+

+------------------+
|null_count_symbols|
+------------------+
|                10|
+------------------+

+----------------+
|null_count_names|
+----------------+
|               9|
+----------------+



In [170]:
from pyspark.sql.functions import col, lit, coalesce, when, isnan
from pyspark.sql.types import IntegerType, StringType

cleaned_tokens_df = tokens_df.alias("t")

cleaned_tokens_df = cleaned_tokens_df.withColumn(
    "token_decimals",
    coalesce(col("decimals"), lit(18)).cast(IntegerType())
)

cleaned_tokens_df = cleaned_tokens_df.withColumn(
    "token_symbol",
    # Lo·∫°i b·ªè chu·ªói r·ªóng ('') ho·∫∑c kho·∫£ng tr·∫Øng, sau ƒë√≥ ƒëi·ªÅn '[UNKNOWN]' n·∫øu v·∫´n l√† NULL
    coalesce(col("symbol"), lit("")).cast(StringType()) # Cast v·ªÅ String tr∆∞·ªõc khi thay th·∫ø
).withColumn(
    "token_name",
    coalesce(col("name"), lit("[UNKNOWN_NAME]"))
)

cleaned_tokens_df = cleaned_tokens_df.select(
    col("address").alias("token_address"),  # Kh√≥a ch√≠nh (C·∫ßn thi·∫øt cho JOIN)
    col("token_symbol"),
    col("token_name"),
    col("token_decimals")
)

cleaned_tokens_df.printSchema()
cleaned_tokens_df.show(3, truncate=False, vertical=True)

root
 |-- token_address: string (nullable = true)
 |-- token_symbol: string (nullable = false)
 |-- token_name: string (nullable = false)
 |-- token_decimals: integer (nullable = false)

-RECORD 0----------------------------------------------------
 token_address  | 0x000000000002c0b7dfdf4e6a7d53aeb8889b80bf 
 token_symbol   | XTREME                                     
 token_name     | Xtremeverse                                
 token_decimals | 18                                         
-RECORD 1----------------------------------------------------
 token_address  | 0x0000000000085d4780b73119b644ae5ecd22b376 
 token_symbol   | TUSD                                       
 token_name     | TrueUSD                                    
 token_decimals | 18                                         
-RECORD 2----------------------------------------------------
 token_address  | 0x0000000000a39bb272e79075ade125fd351887ac 
 token_symbol   | BLURPOOL                                   
 token_

## Processing Token Transfer Transactions

In [171]:
from pyspark.sql.functions import col, lit, when, cast, pow
from pyspark.sql.types import DecimalType, LongType, StringType, IntegerType, DoubleType

DECIMAL_38_0 = DecimalType(38, 0)
DECIMAL_38_18 = DecimalType(38, 18)

# --- B∆Ø·ªöC 1: L√ÄM S·∫†CH V√Ä JOIN V·ªöI DIM TOKEN V√Ä CONTRACTS ---
token_data_df = token_transfers_df.alias("tt").join(
    cleaned_tokens_df.alias("t"),
    on=col("tt.token_address") == col("t.token_address"),
    how="left"
).join(
    contracts_df.alias("c"),
    on=col("tt.token_address") == col("c.address"),
    how="left"
).select(
    # ... (C·ªôt kh√≥a v√† ch·ªâ m·ª•c) ...
    col("tt.transaction_hash").alias("tx_hash"),
    col("tt.log_index").cast(LongType()),
    col("tt.block_number").cast(LongType()),

    # ... (Th√¥ng tin chuy·ªÉn nh·∫≠n, token_address, symbol, decimals) ...
    col("tt.from_address").alias("sender_address"),
    col("tt.to_address").alias("receiver_address"),
    col("tt.token_address"),
    col("t.token_name"),
    col("t.token_symbol"),
    col("t.token_decimals").cast(IntegerType()),
    
    # FIX: L∆∞u tr·ªØ gi√° tr·ªã RAW l·ªõn d∆∞·ªõi d·∫°ng STRING (thay v√¨ Decimal(38, 0))
    col("tt.value").cast(StringType()).alias("amount_raw"), 
    
    # T√çNH TO√ÅN token_type
    when(col("c.is_erc20") == True, lit("ERC20"))
        .when(col("c.is_erc721") == True, lit("ERC721"))
        .otherwise(lit("OTHER"))
        .alias("token_type")
)

token_data_df.printSchema()
token_data_df.show(n=1, truncate=True, vertical=True)

root
 |-- tx_hash: string (nullable = true)
 |-- log_index: long (nullable = true)
 |-- block_number: long (nullable = true)
 |-- sender_address: string (nullable = true)
 |-- receiver_address: string (nullable = true)
 |-- token_address: string (nullable = true)
 |-- token_name: string (nullable = true)
 |-- token_symbol: string (nullable = true)
 |-- token_decimals: integer (nullable = true)
 |-- amount_raw: string (nullable = true)
 |-- token_type: string (nullable = false)

-RECORD 0--------------------------------
 tx_hash          | 0x33f773891e244c0... 
 log_index        | 4                    
 block_number     | 20659158             
 sender_address   | 0x000000000000000... 
 receiver_address | 0x1105e8d2bb5a7a6... 
 token_address    | 0xa3d58c4e56fedca... 
 token_name       | Metronome            
 token_symbol     | MET                  
 token_decimals   | 18                   
 amount_raw       | 2.8815E21            
 token_type       | OTHER                
only showing 

In [172]:
from pyspark.sql.functions import col, lit, pow, to_timestamp, cast
from pyspark.sql.types import DecimalType, LongType, DoubleType, StringType

df_temp_join = token_data_df.alias("td").join(
    df_fact_native_transactions.alias("nt"),
    on=col("td.tx_hash") == col("nt.tx_hash"),
    how="inner" 
)

# Danh s√°ch c·ªôt theo th·ª© t·ª± chu·∫©n Fact Table
token_fact_column_order = [
    # 1. Kh√≥a Ch√≠nh v√† Kh√≥a Ngo·∫°i
    "tx_hash", 
    "log_index",
    
    # 2. Metadata Block/Th·ªùi gian
    "block_number",
    "block_timestamp",
    "tx_nonce",
    
    # 3. Th√¥ng tin token
    "token_address",
    "token_symbol",
    "token_type", 
    
    # 4. Th√¥ng tin v√≠
    "sender_address",
    "receiver_address",
    
    # 5. Gi√° tr·ªã v√† Decimals
    "amount_raw", 
    "amount_display",
    "token_decimals",
]


fact_token_transactions_df = df_temp_join.select(
    # Kh√≥a v√† Index
    col("td.tx_hash"),
    col("td.log_index"),
    
    # Metadata Block
    col("nt.block_number"),
    col("nt.block_timestamp"),
    col("nt.tx_nonce"),
    
    # Th√¥ng tin Token
    col("td.token_address"),
    col("td.token_symbol"),
    col("td.token_type"),
    
    # Th√¥ng tin V√≠
    col("td.sender_address"),
    col("td.receiver_address"),
    
    # Gi√° tr·ªã Raw (B·∫ÆT BU·ªòC: S·ª≠ d·ª•ng cast(DECIMAL_38_0) - C√°c gi√° tr·ªã > 10^38 s·∫Ω l√† NULL)
    col("td.amount_raw").cast(DECIMAL_38_0).alias("amount_raw"), 
    col("td.token_decimals"),

    # T√çNH TO√ÅN amount_display (D√πng DoubleType cho ph√©p t√≠nh pow an to√†n):
    (col("td.amount_raw").cast(DoubleType()) / (pow(lit(10.0), col("td.token_decimals")))) 
        .alias("amount_display").cast(DECIMAL_38_18)
).select(
    # 3. S·∫Øp x·∫øp l·∫°i th·ª© t·ª± c·ªôt
    *token_fact_column_order 
)

# Ki·ªÉm tra schema cu·ªëi c√πng v√† hi·ªÉn th·ªã
fact_token_transactions_df.printSchema()
fact_token_transactions_df.show(n=1, vertical=True, truncate=False)

root
 |-- tx_hash: string (nullable = true)
 |-- log_index: long (nullable = true)
 |-- block_number: long (nullable = true)
 |-- block_timestamp: timestamp (nullable = true)
 |-- tx_nonce: long (nullable = true)
 |-- token_address: string (nullable = true)
 |-- token_symbol: string (nullable = true)
 |-- token_type: string (nullable = false)
 |-- sender_address: string (nullable = true)
 |-- receiver_address: string (nullable = true)
 |-- amount_raw: decimal(38,0) (nullable = true)
 |-- amount_display: decimal(38,18) (nullable = true)
 |-- token_decimals: integer (nullable = true)

-RECORD 0------------------------------------------------------------------------------
 tx_hash          | 0x66b74900336f2995517e561e15935623e59059f3479e38eac4576dc6aa275466 
 log_index        | 405                                                                
 block_number     | 20659158                                                           
 block_timestamp  | 2024-09-02 07:00:11                   

## Export CSV

In [173]:
# ƒê·∫∑t t√™n th∆∞ m·ª•c ƒë·∫ßu ra
output_dir = "/Users/cuongct090_04/PycharmProjects/spark-ethereum-etl/output"
native_tx_output_path = f"{output_dir}/native_transactions"

fact_native_transactions_df.repartition(1).write.csv(
    path=native_tx_output_path,
    mode="overwrite",      # Ch·∫ø ƒë·ªô ghi: Ghi ƒë√® th∆∞ m·ª•c n·∫øu n√≥ ƒë√£ t·ªìn t·∫°i
    header=True,           # Th√™m d√≤ng ti√™u ƒë·ªÅ (t√™n c·ªôt) v√†o file CSV
    sep=",",               # D√πng d·∫•u ph·∫©y (,) l√†m k√Ω t·ª± ph√¢n t√°ch
)


print(f"D·ªØ li·ªáu native transactions ƒë√£ ƒë∆∞·ª£c ghi th√†nh c√¥ng v√†o th∆∞ m·ª•c: {native_tx_output_path}")

D·ªØ li·ªáu native transactions ƒë√£ ƒë∆∞·ª£c ghi th√†nh c√¥ng v√†o th∆∞ m·ª•c: /Users/cuongct090_04/PycharmProjects/spark-ethereum-etl/output/native_transactions


In [174]:
token_tx_output_path = f"{output_dir}/token_transactions"

fact_token_transactions_df.repartition(1).write.csv(
    path=token_tx_output_path,
    mode="overwrite",
    header=True,
    sep=","
)

print(f"D·ªØ li·ªáu token transactions ƒë√£ ƒë∆∞·ª£c ghi th√†nh c√¥ng v√†o th∆∞ m·ª•c: {token_tx_output_path}")

D·ªØ li·ªáu token transactions ƒë√£ ƒë∆∞·ª£c ghi th√†nh c√¥ng v√†o th∆∞ m·ª•c: /Users/cuongct090_04/PycharmProjects/spark-ethereum-etl/output/token_transactions
