In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, unix_timestamp, avg, min, max, median
from pyspark.sql.window import Window


In [2]:
spark = SparkSession.builder \
                    .appName("Eth block analysis") \
                    .getOrCreate()

25/05/27 17:33:30 WARN Utils: Your hostname, dyscarnate-B550M-HDV-AR resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface enp4s0)
25/05/27 17:33:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/27 17:33:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/27 17:33:30 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [3]:
data_path = "/samsung/data/transactions"
df = spark.read.parquet(data_path)

In [4]:
df.printSchema()  

root
 |-- block_number: long (nullable = true)
 |-- transaction_index: decimal(20,0) (nullable = true)
 |-- transaction_hash: binary (nullable = true)
 |-- nonce: decimal(20,0) (nullable = true)
 |-- from_address: binary (nullable = true)
 |-- to_address: binary (nullable = true)
 |-- value_binary: binary (nullable = true)
 |-- value_string: string (nullable = true)
 |-- value_f64: double (nullable = true)
 |-- input: binary (nullable = true)
 |-- gas_limit: decimal(20,0) (nullable = true)
 |-- gas_used: decimal(20,0) (nullable = true)
 |-- gas_price: decimal(20,0) (nullable = true)
 |-- transaction_type: long (nullable = true)
 |-- max_priority_fee_per_gas: decimal(20,0) (nullable = true)
 |-- max_fee_per_gas: decimal(20,0) (nullable = true)
 |-- success: boolean (nullable = true)
 |-- n_input_bytes: long (nullable = true)
 |-- n_input_zero_bytes: long (nullable = true)
 |-- n_input_nonzero_bytes: long (nullable = true)
 |-- chain_id: decimal(20,0) (nullable = true)



In [5]:
from pyspark.sql.functions import count
df.count()

13577002

In [6]:
#Преобразование данных
from pyspark.sql.functions import col, sha2, hex

df = df.withColumn("from_address", hex(col("from_address"))) \
       .withColumn("to_address", hex(col("to_address"))) \
       .withColumn("transaction_hash", hex(col("transaction_hash")))

In [7]:
#Число транзакций в блоке
df.groupBy("block_number") \
  .count() \
  .orderBy("count", ascending=False) \
  .limit(10) \
  .show(truncate=False)

                                                                                

+------------+-----+
|block_number|count|
+------------+-----+
|21289505    |1179 |
|21313704    |1089 |
|21268920    |1064 |
|21298241    |941  |
|21298407    |911  |
|21275775    |905  |
|21195831    |884  |
|21289528    |852  |
|21298229    |842  |
|21261819    |782  |
+------------+-----+



In [8]:
# Топ отправителей по объему (value_f64)
from pyspark.sql.functions import sum, avg, max, min

df.groupBy("from_address") \
  .agg(sum("value_f64").alias("total_sent"), 
       avg("value_f64").alias("avg_sent"),
       count("*").alias("transaction_count")) \
  .orderBy("transaction_count", ascending=False) \
  .limit(10) \
  .show(truncate=False)

25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:33:39 WARN RowBasedKeyValueBatch: Calling spill() on

+----------------------------------------+---------------------+----------------------+-----------------+
|from_address                            |total_sent           |avg_sent              |transaction_count|
+----------------------------------------+---------------------+----------------------+-----------------+
|B5D85CBF7CB3EE0D56B3BB207D5FC4B82F43F511|1.1884602194080315E24|9.34257968703497E18   |127209           |
|46340B20830761EFD32832A74D7169B29FEB9758|1.3843707956797246E23|1.10871178465976678E18|124863           |
|F89D7B9C864F589BBF53A82105107622B35EAA40|3.035837175996922E23 |2.7497528857622205E18 |110404           |
|28C6C06298D514DB089934071355E5743BF21D60|2.1174667398334908E24|2.000516542744639E19  |105846           |
|DFD5293D8E347DFE59E90EFD55B2956A1343963D|3.141719632204998E23 |3.1700599683217951E18 |99106            |
|21A31EE1AFC51D94C2EFCCAA2092AD1028285549|3.433369601589801E23 |3.4944170677636316E18 |98253            |
|0D0707963952F2FBA59DD06F2B425ACE40B492FE|6.99

                                                                                

In [9]:
# Топ получателей
df.groupBy("to_address") \
  .agg(sum("value_f64").alias("total_received")) \
  .orderBy("total_received", ascending=False) \
  .limit(10) \
  .show(truncate=False)



+----------------------------------------+---------------------+
|to_address                              |total_received       |
+----------------------------------------+---------------------+
|28C6C06298D514DB089934071355E5743BF21D60|2.211588073311635E24 |
|A9D1E08C7793AF67E9D92FE308D5697FB81D3E43|1.1973427220088355E24|
|B5D85CBF7CB3EE0D56B3BB207D5FC4B82F43F511|1.1521026947300787E24|
|EAE7380DD4CEF6FBD1144F49E4D1E6964258A4F4|4.2478679083094E23   |
|CD531AE9EFCCE479654C4926DEC5F6209531CA7B|4.072977473848594E23 |
|DFD5293D8E347DFE59E90EFD55B2956A1343963D|3.926530268196257E23 |
|DBF5E9C5206D0DB70A90108BF936DA60221DC080|3.8410999939865993E23|
|21A31EE1AFC51D94C2EFCCAA2092AD1028285549|3.754475770492716E23 |
|77134CBC06CB00B66F4C7E623D5FDBF6777635EC|3.6982387204511E23   |
|CEB69F6342ECE283B2F5C9088FF249B5D0AE66EA|3.684533194714802E23 |
+----------------------------------------+---------------------+



                                                                                

In [31]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, hex, when, countDistinct
# 1. Агрегация по адресам
address_features = df.groupBy("from_address").agg(
    F.count("*").alias("outgoing_tx_count"),
    F.mean("value_f64").alias("avg_outgoing_value"),
    F.sum(when(col("success") == False, 1).otherwise(0)).alias("failed_tx_count"),
    countDistinct("to_address").alias("unique_recipients"),
    F.sum("n_input_nonzero_bytes").alias("contract_interactions")
)


In [32]:
# 2. Временные признаки
window_spec = Window.partitionBy("from_address").orderBy("block_number")

time_features = df.withColumn("prev_block", F.lag("block_number").over(window_spec)) \
                 .withColumn("block_diff", col("block_number") - col("prev_block")) \
                 .groupBy("from_address").agg(
                     F.expr("percentile_approx(block_diff, 0.5)").alias("median_time_between_tx"),
                     F.min("block_number").alias("first_block"),
                     F.max("block_number").alias("last_block")
                 )

In [33]:
# 3. Признаки активности по скользящим окнам
block_window_1h = Window.partitionBy("from_address") \
                       .orderBy("block_number") \
                       .rangeBetween(-60, 0)  # Предположим 1 блок = 1 минута


activity_features = df.withColumn("tx_last_hour", F.count("*").over(block_window_1h)) \
                     .groupBy("from_address").agg(
                         F.mean("tx_last_hour").alias("avg_hourly_activity"),
                         F.stddev("tx_last_hour").alias("std_hourly_activity")
                     )

In [34]:
# 4. Объединение всех признаков
final_features = address_features.join(time_features, "from_address", "left") \
                                .join(activity_features, "from_address", "left") \
                                .withColumn("activity_ratio", col("avg_hourly_activity") / col("outgoing_tx_count"))

In [35]:
# 5. Газовые метрики
gas_metrics = df.withColumn("tx_fee", col("gas_used") * col("gas_price")) \
               .groupBy("from_address").agg(
                   F.mean("tx_fee").alias("avg_tx_fee"),
                   F.sum("tx_fee").alias("total_tx_fee")
               )

final_features = final_features.join(gas_metrics, "from_address", "left")

In [36]:
# 8. Временные аномалии
spike_window = Window.partitionBy("from_address") \
                   .orderBy("block_number") \
                   .rowsBetween(-10, 0)

spike_detection = df.withColumn("rolling_avg", F.avg("value_f64").over(spike_window)) \
                   .withColumn("value_spike", 
                              when(col("value_f64") > 2 * col("rolling_avg"), 1).otherwise(0)) \
                   .groupBy("from_address").agg(
                       F.sum("value_spike").alias("value_spikes")
                   )

final_features = final_features.join(spike_detection, "from_address", "left")

In [37]:
final_features.printSchema()

root
 |-- from_address: string (nullable = true)
 |-- outgoing_tx_count: long (nullable = false)
 |-- avg_outgoing_value: double (nullable = true)
 |-- failed_tx_count: long (nullable = true)
 |-- unique_recipients: long (nullable = false)
 |-- contract_interactions: long (nullable = true)
 |-- median_time_between_tx: long (nullable = true)
 |-- first_block: long (nullable = true)
 |-- last_block: long (nullable = true)
 |-- avg_hourly_activity: double (nullable = true)
 |-- std_hourly_activity: double (nullable = true)
 |-- activity_ratio: double (nullable = true)
 |-- avg_tx_fee: decimal(38,4) (nullable = true)
 |-- total_tx_fee: decimal(38,0) (nullable = true)
 |-- value_spikes: long (nullable = true)



In [38]:
final_features.head(5)

25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 17:56:53 WARN RowBasedKeyValueBatch: Calling spill() on

[Row(from_address='0340E6C62D8545E1098A73086A285FD03B4E0400', outgoing_tx_count=1, avg_outgoing_value=0.0, failed_tx_count=0, unique_recipients=1, contract_interactions=32, median_time_between_tx=None, first_block=21321355, last_block=21321355, avg_hourly_activity=1.0, std_hourly_activity=None, activity_ratio=1.0, avg_tx_fee=Decimal('684993560537530.0000'), total_tx_fee=Decimal('684993560537530'), value_spikes=0),
 Row(from_address='2BC9A3F047D4C974BB76CC3FB7751347FC6E3814', outgoing_tx_count=5, avg_outgoing_value=3.72e+16, failed_tx_count=0, unique_recipients=2, contract_interactions=403, median_time_between_tx=13, first_block=21246325, last_block=21253599, avg_hourly_activity=1.6, std_hourly_activity=0.8944271909999159, activity_ratio=0.32, avg_tx_fee=Decimal('1618369271430997.4000'), total_tx_fee=Decimal('8091846357154987'), value_spikes=1),
 Row(from_address='3165650B186B3BBB43562B7AD14D11596F9BB3F5', outgoing_tx_count=2, avg_outgoing_value=7215302485294712.0, failed_tx_count=0, un

In [58]:
from pyspark.sql.functions import col, when, lit, count, least, greatest

def apply_fraud_rules(df, tx_df):
    # --- Шаг 1: нормализуем пары [A, B] чтобы не дублировались [A→B] и [B→A] ---
    tx_pairs = tx_df.withColumn("addr1", least("from_address", "to_address")) \
                    .withColumn("addr2", greatest("from_address", "to_address"))

    # --- Шаг 2: считаем количество транзакций между addr1 и addr2 ---
    pingpong_counts = tx_pairs.groupBy("addr1", "addr2").agg(count("*").alias("tx_between_pair"))

    # --- Шаг 3: фильтруем только пары с >10 взаимных транзакций ---
    pingpong_pairs = pingpong_counts.filter(col("tx_between_pair") > 10)

    # --- Шаг 4: превращаем пары в список адресов, которые участвовали в ping-pong ---
    pingpong_addresses = pingpong_pairs.select("addr1").union(
        pingpong_pairs.select("addr2").withColumnRenamed("addr2", "addr1")
    ).dropDuplicates().withColumnRenamed("addr1", "from_address") \
     .withColumn("pingpong", lit(1))

    # --- Шаг 5: добавляем колонку pingpong к признаковому DataFrame ---
    df = df.join(pingpong_addresses, on="from_address", how="left").fillna({"pingpong": 0})

    # --- Шаг 6: добавляем бинарные признаки по каждому правилу ---
    df = df.withColumn("contracts_and_low_activity", 
                       when((col("contract_interactions") > 500) & (col("avg_hourly_activity") < 1.0), 1).otherwise(0)) \
           .withColumn("many_recipients_few_txs", 
                       when((col("unique_recipients") > 5) & (col("outgoing_tx_count") < 3), 1).otherwise(0)) \
           .withColumn("high_fee", 
                       when(col("avg_tx_fee") > 1e15, 1).otherwise(0)) \
           .withColumn("short_time_between_tx", 
                       when(col("median_time_between_tx").isNotNull() & (col("median_time_between_tx") < 10), 1).otherwise(0)) \
           .withColumn("value_spikes", 
                       when(col("value_spikes") > 0, 1).otherwise(0))

    # --- Шаг 7: итоговая метка is_fraud как OR всех правил ---
    df = df.withColumn(
        "is_fraud",
        when(
            (col("contracts_and_low_activity") == 1) |
            (col("many_recipients_few_txs") == 1) |
            (col("high_fee") == 1) |
            (col("short_time_between_tx") == 1) |
            (col("value_spikes") == 1) |
            (col("pingpong") == 1),
            1
        ).otherwise(0)
    )

    # --- Шаг 8: оставляем только нужные колонки ---
    df = df.select(
        "from_address",
        "contracts_and_low_activity",
        "many_recipients_few_txs",
        "high_fee",
        "short_time_between_tx",
        "value_spikes",
        "pingpong",
        "is_fraud"
    )

    return df


In [59]:
final_with_flags = apply_fraud_rules(final_features, df)
final_with_flags.select("from_address", "is_fraud").show(truncate=False)

25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:22:17 WARN RowBasedKeyValueBatch: Calling spill() on

+----------------------------------------+--------+
|from_address                            |is_fraud|
+----------------------------------------+--------+
|0340E6C62D8545E1098A73086A285FD03B4E0400|0       |
|076AAE891F617CC61FFD54F0920A02FB27367CD5|0       |
|12469989C0F3B38F0F230F94B58E852249CFC9DD|0       |
|2BC9A3F047D4C974BB76CC3FB7751347FC6E3814|1       |
|3165650B186B3BBB43562B7AD14D11596F9BB3F5|0       |
|44C4B72ADBCB3102797D821B9EDDA255AAE048A1|1       |
|49E60022969E3EDD88250051646EDA5F6561ED38|1       |
|4BBA26212F843144A8DCA39C63803A1BD2ECAC39|1       |
|54AA64D7554DFA91D9559A75AE437D571E3713A0|1       |
|56691E56F68A610897EF1790ACCF7C882ABEC581|1       |
|69F332511F24BA91060E34D7617AB58C5A7E318B|0       |
|8063D50C1B0AA617D03B1E1BFAC1929C0182F9A2|0       |
|843F3C2D1E678F763E17DFA5F18B510ECD1B9145|1       |
|88FE5FB5F74753C84350F33F9BA755782585BC13|1       |
|89A88EE6C7D3D5F48E868BBF09D25E7AA0488649|1       |
|92C5F3597BD728ED4C5386FF0AD61B24875C9A79|1       |
|958BE7C639C

                                                                                

In [66]:
final_with_flags.filter(col("pingpong") == 1).count()

25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/27 18:27:10 WARN RowBasedKeyValueBatch: Calling spill() on

53744