# Batch (Spark) — Google BigQuery

## 1. Ethereum 5 Days transaction analysis 

In [7]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, desc

# Set GCP credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "spark-access-459420-f8b1f097831d.json"

# Start Spark session with a fixed UI port for monitoring
print("Initializing Spark session...")
try:
    spark = SparkSession.builder \
        .appName("Ethereum5DaysAnalysis") \
        .config("spark.ui.port", "4041") \
        .config("spark.jars", "spark-bigquery-with-dependencies_2.12-0.32.0.jar") \
        .getOrCreate() 
    print("Spark session initialized successfully.")
    print("Go to -> http://localhost:4041/jobs/")
except Exception as e:
    print(f"Error initializing Spark session: {e}")
    exit(1)

# Load data from BigQuery for the period May 5-9, 2025
print("Starting data load from BigQuery...")
start_time = time.time()
try:
    df = spark.read.format("bigquery") \
        .option("table", "bigquery-public-data.crypto_ethereum.transactions") \
        .option("filter", "block_timestamp >= TIMESTAMP('2025-05-05') AND block_timestamp < TIMESTAMP('2025-05-10')") \
        .load()
    print(f"Data loaded in {round(time.time() - start_time, 1)}s — Rows: {df.count():,}")
except Exception as e:
    print(f"Error loading data from BigQuery: {e}")
    spark.stop()
    exit(1)

# Data selection and filtering
print("Processing and cleaning data...")
df_cleaned = df.select("from_address", "to_address", "value", "gas_price", "block_timestamp") \
              .filter(col("from_address").isNotNull())

# Analysis: 1. Top 10 senders by transaction count
print("Performing analysis: Top 10 senders by transaction count...")
top_senders_count = df_cleaned.groupBy("from_address") \
    .agg(count("*").alias("tx_count")) \
    .orderBy(desc("tx_count")) \
    .limit(10)

# Analysis: 2. Top 10 by total value
print("Performing analysis: Top 10 senders by total value...")
top_senders_value = df_cleaned.groupBy("from_address") \
    .agg(sum("value").alias("total_value")) \
    .orderBy(desc("total_value")) \
    .limit(10)

# Analysis: 3. Average gas price per sender
print("Performing analysis: Top 10 senders by average gas price...")
avg_gas_price = df_cleaned.groupBy("from_address") \
    .agg(avg("gas_price").alias("avg_gas_price")) \
    .orderBy(desc("avg_gas_price")) \
    .limit(10)

# Analysis: 4. Count of "expensive" transactions (gas_price > 10^9)
print("Counting expensive transactions...")
expensive_tx = df_cleaned.filter(col("gas_price") > 10**9).count()

# Output results
print("\nAnalysis Results:")
print("Top 10 senders by transaction count:")
top_senders_count.show(truncate=False)

print("Top 10 senders by total value:")
top_senders_value.show(truncate=False)

print("Top 10 senders by average gas price:")
avg_gas_price.show(truncate=False)

print(f"Expensive transactions (gas_price > 10^9): {expensive_tx:,}")

# Stop Spark session
print("\nStopping Spark session...")
time.sleep(100)
spark.stop()
print("Process completed. Spark session closed.")

Initializing Spark session...
Spark session initialized successfully.
Go to -> http://localhost:4041/jobs/
Starting data load from BigQuery...
Error loading data from BigQuery: An error occurred while calling o112.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: bigquery. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAcce

25/05/11 02:52:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


NameError: name 'df' is not defined

In [3]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, desc

# Set GCP credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "spark-access-459420-f8b1f097831d.json"

# Start Spark session with a fixed UI port for monitoring
print("Initializing Spark session...")
try:
    spark = SparkSession.builder \
        .appName("Ethereum5DaysAnalysis") \
        .config("spark.ui.port", "4041") \
        .getOrCreate()  
    print("Spark session initialized successfully.")
    print("Go to -> http://localhost:4041/jobs/")
except Exception as e:
    print(f"Error initializing Spark session: {e}")
    exit(1)

# Load data from BigQuery for the period May 5-9, 2025
print("Starting data load from BigQuery...")
start_time = time.time()
try:
    df = spark.read.format("bigquery") \
        .option("table", "bigquery-public-data.crypto_ethereum.transactions") \
        .option("filter", "block_timestamp >= TIMESTAMP('2025-05-05') AND block_timestamp < TIMESTAMP('2025-05-10')") \
        .load()
    print(f"Data loaded in {round(time.time() - start_time, 1)}s — Rows: {df.count():,}")
except Exception as e:
    print(f"Error loading data from BigQuery: {e}")
    spark.stop()
    exit(1)

# Data selection and filtering
print("Processing and cleaning data...")
df_cleaned = df.select("from_address", "to_address", "value", "gas_price", "block_timestamp") \
              .filter(col("from_address").isNotNull())

# Analysis: 1. Top 10 senders by transaction count
print("Performing analysis: Top 10 senders by transaction count...")
top_senders_count = df_cleaned.groupBy("from_address") \
    .agg(count("*").alias("tx_count")) \
    .orderBy(desc("tx_count")) \
    .limit(10)

# Analysis: 2. Top 10 by total value
print("Performing analysis: Top 10 senders by total value...")
top_senders_value = df_cleaned.groupBy("from_address") \
    .agg(sum("value").alias("total_value")) \
    .orderBy(desc("total_value")) \
    .limit(10)

# Analysis: 3. Average gas price per sender
print("Performing analysis: Top 10 senders by average gas price...")
avg_gas_price = df_cleaned.groupBy("from_address") \
    .agg(avg("gas_price").alias("avg_gas_price")) \
    .orderBy(desc("avg_gas_price")) \
    .limit(10)

# Analysis: 4. Count of "expensive" transactions (gas_price > 10^9)
print("Counting expensive transactions...")
expensive_tx = df_cleaned.filter(col("gas_price") > 10**9).count()

# Output results
print("\nAnalysis Results:")
print("Top 10 senders by transaction count:")
top_senders_count.show(truncate=False)

print("Top 10 senders by total value:")
top_senders_value.show(truncate=False)

print("Top 10 senders by average gas price:")
avg_gas_price.show(truncate=False)

print(f"Expensive transactions (gas_price > 10^9): {expensive_tx:,}")

# Stop Spark session
print("\nStopping Spark session...")
time.sleep(100)
spark.stop()
print("Process completed. Spark session closed.")

Initializing Spark session...
Spark session initialized successfully.
Go to -> http://localhost:4041/jobs/
Starting data load from BigQuery...
Error loading data from BigQuery: An error occurred while calling o63.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: bigquery. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAcces

NameError: name 'df' is not defined

ConnectionRefusedError: [Errno 61] Connection refused

# 2. Thief wallet transaction analysis (one year period)

In [5]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import bround, col, count, sum, desc, when
from pyspark.sql.window import Window
from collections import deque

# Set GCP credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "spark-access-459420-f8b1f097831d.json"

# Start Spark session with optimization and parallelism settings
spark = SparkSession.builder \
    .appName("EthereumThiefAnalysisOptimized") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .config("spark.ui.port", "4041") \
    .config("spark.driver.extraJavaOptions", "-Dlog4j.logLevel=ERROR") \
    .config("spark.executor.extraJavaOptions", "-Dlog4j.logLevel=ERROR") \
    .getOrCreate()
print("Spark session initialized successfully.")
print("Go to -> http://localhost:4041/jobs")

print("Spark session initialized.")

# Load data from BigQuery for the period May 2024 to May 2025
start_time = time.time()
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.crypto_ethereum.transactions") \
    .option("filter", "block_timestamp >= TIMESTAMP('2024-05-01')") \
    .load()
print(f"Data loaded in {round(time.time() - start_time, 1)}s — Rows: {df.count():,}")

# Clean data
thief_wallet = "0x974caa59e49682cda0ad2bbe82983419a2ecc400"

df_cleaned = df.select("from_address", "to_address", "value", "gas_price", "block_timestamp") \
    .filter(col("from_address").isNotNull() & col("value").isNotNull() & (col("value") > 0))

df_thief_sent = df_cleaned.filter(col("from_address") == thief_wallet)
df_thief_received = df_cleaned.filter(col("to_address") == thief_wallet)

# Top 5 wallets by funds sent (by number of transactions and volume)
top_sent_wallets_count = df_thief_sent.groupBy("to_address") \
    .agg(count("*").alias("tx_count"), bround(sum("value"), 8).alias("total_value")) \
    .orderBy(desc("tx_count")) \
    .limit(5)

top_sent_wallets_value = df_thief_sent.groupBy("to_address") \
    .agg(bround(sum("value"), 8).alias("total_value"), count("*").alias("tx_count")) \
    .orderBy(desc("total_value")) \
    .limit(5)

# Top 5 wallets for receiving funds
top_received_wallets_count = df_thief_received.groupBy("from_address") \
    .agg(count("*").alias("tx_count"), bround(sum("value"), 8).alias("total_value")) \
    .orderBy(desc("tx_count")) \
    .limit(5)

top_received_wallets_value = df_thief_received.groupBy("from_address") \
    .agg(bround(sum("value"), 8).alias("total_value"), count("*").alias("tx_count")) \
    .orderBy(desc("total_value")) \
    .limit(5)

# General information
total_sent = df_thief_sent.agg(bround(sum("value"), 8).alias("total_sent"), count("*").alias("sent_count")).first()
total_received = df_thief_received.agg(bround(sum("value"), 8).alias("total_received"), count("*").alias("received_count")).first()

print(f"Total sent: {total_sent['total_sent']}, Total sent transactions: {total_sent['sent_count']}")
print(f"Total received: {total_received['total_received']}, Total received transactions: {total_received['received_count']}")

print("Top 5 wallets sent funds to (by count of transactions):")
top_sent_wallets_count.show(truncate=False)

print("Top 5 wallets sent funds to (by total value):")
top_sent_wallets_value.show(truncate=False)

print("Top 5 wallets received funds from (by count of transactions):")
top_received_wallets_count.show(truncate=False)

print("Top 5 wallets received funds from (by total value):")
top_received_wallets_value.show(truncate=False)

print("\nStopping Spark session...")
time.sleep(100)
spark.stop()
print("Process completed. Spark session closed.")

Spark session initialized successfully.
Go to -> http://localhost:4041/jobs
Spark session initialized.


Py4JJavaError: An error occurred while calling o104.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: bigquery. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more
