In [1]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import count, avg, window, row_number, sum, hour, from_unixtime, date_format, to_timestamp, cast, col
from pyspark.sql import Window


In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("EthereumETL").getOrCreate()

23/01/08 20:21:55 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.29.63 instead (on interface wlo1)
23/01/08 20:21:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/08 20:21:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Real-time analytics

The moving average of the number of transactions in a block (for a window of 5 blocks)

just to make it simpler, we're reading data from a parquet file instead of kafka topic.
1. POC for writing to kafka topic is in send-data-to-kafka.py
2. POC for reading from kafka topic is in receive-data-on-kafka.py
3. Refer to the comments in those files for more details

In [3]:
# Read data from Kafka
df = spark.read.parquet("transactions.parquet")

# group by block number and count the number of transactions
df = df.groupBy("block_number").count()

# window for row_number 
windowSpec = Window.partitionBy("block_number").orderBy("block_number")
df = df.withColumn("row_number", row_number().over(windowSpec))

# window for rolling average
windowSpec = Window.partitionBy("row_number").orderBy("row_number").rowsBetween(-4, 0)
df = df.withColumn("rolling_avg", avg("count").over(windowSpec))

df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------+-----+----------+------------------+
|block_number|count|row_number|       rolling_avg|
+------------+-----+----------+------------------+
|      480002|    2|         1|               2.0|
|      480004|    2|         1|               2.0|
|      480006|    4|         1|2.6666666666666665|
|      480008|    1|         1|              2.25|
|      480012|    3|         1|               2.4|
|      480025|    2|         1|               2.4|
|      480033|    1|         1|               2.2|
|      480036|    1|         1|               1.6|
|      480038|    1|         1|               1.6|
|      480039|    2|         1|               1.4|
|      480041|    1|         1|               1.2|
|      480043|    1|         1|               1.2|
|      480051|    1|         1|               1.2|
|      480052|    1|         1|               1.2|
|      480071|    2|         1|               1.2|
|      480074|    2|         1|               1.4|
|      480080|    1|         1|

                                                                                

Total value of gas every hour

In [4]:
df = spark.read.parquet("transactions.parquet")

df = df.withColumn("block_timestamp", from_unixtime(df["block_timestamp"]))

# Extract the hour from the block_timestamp column
df = df.withColumn("date", date_format(df["block_timestamp"], "yyyy-MM-dd"))
df = df.withColumn("hour", hour(df["block_timestamp"]))

# Group the data by hour and sum the gas column
df = df.groupBy("date", "hour").sum("gas")

df = df.sort("date", "hour")
df.show(20)

+----------+----+--------+
|      date|hour|sum(gas)|
+----------+----+--------+
|2015-11-03|   1|10643000|
|2015-11-03|   2|52934000|
|2015-11-03|   3|36920244|
|2015-11-03|   4|12912000|
|2015-11-03|   5|21428000|
|2015-11-03|   6|19175000|
|2015-11-03|   7|22663000|
|2015-11-03|   8|18651000|
|2015-11-03|   9|21208000|
|2015-11-03|  10|20266000|
|2015-11-03|  11|47825318|
|2015-11-03|  12|21020000|
|2015-11-03|  13|24316000|
|2015-11-03|  14|37869000|
|2015-11-03|  15|38480327|
|2015-11-03|  16|34198425|
|2015-11-03|  17|35333904|
|2015-11-03|  18|37797318|
|2015-11-03|  19|37711000|
|2015-11-03|  20|23465201|
+----------+----+--------+
only showing top 20 rows



Running count of number of transfers sent and received by addresses

In [5]:
df = spark.read.parquet("transactions.parquet")

# Group the data by 'from_address' and 'to_address', and count the number of occurrences
df_grouped = df.groupBy('from_address', 'to_address').count()

# Add a column with the running count of transfers
windowSpec = Window.orderBy("from_address", "to_address")
df_grouped = df_grouped.withColumn('running_count', sum('count').over(windowSpec))

# Display the resulting DataFrame
df_grouped.show(20)

23/01/08 20:22:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/08 20:22:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/08 20:22:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/08 20:22:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/08 20:22:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/08 20:22:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/08 2

### Setup Clickhouse

If you are on linux system:
```shell
make get-clickhouse-docker-image
```

and then 
```shell
make start-clickhouse-docker-container 
```

This will download the clickhouse docker-image and start the container. 

If you're not on Linux environment
```shell
docker pull clickhouse/clickhouse-server
```
followed by 
```shell
docker run -d -p 18123:8123 -p19000:9000 --name clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```

In [6]:
import clickhouse_connect

# Change the following variables to match your ClickHouse setup
clickhouse_host = 'localhost'
clickhouse_port = 18123

clickhouse_client = clickhouse_connect.get_client(host=clickhouse_host, port=clickhouse_port)

def get_create_sql_stmt_from_df(df, table_name):
    columns = []
    for column, dtype in df.dtypes.items():
        if dtype == 'int64':
            clickhouse_dtype = 'Int64'
        elif dtype == 'float64':
            clickhouse_dtype = 'Float64'
        elif dtype == 'object':
            clickhouse_dtype = 'String'
        elif dtype == 'bool':
            clickhouse_dtype = 'UInt8'  # ClickHouse uses UInt8 to represent boolean values
        else:
            clickhouse_dtype = '???'  # Unknown dtype
        columns.append(f'{column} {clickhouse_dtype}')
    columns_str = ', '.join(columns)
    create_table_stmt = f'CREATE TABLE {table_name} ({columns_str}) ENGINE = MergeTree order by {df.dtypes.index[0]}'
    return create_table_stmt

def execute_sql_stmt(sql_stmt):
    clickhouse_client.command(sql_stmt)


### Prepare the data for ClickHouse Ad-hoc analysis

Create Table in clickhouse using the schema from the dataframe, and insert the dataframe into the table

In [7]:
execute_sql_stmt("""
    DROP TABLE IF EXISTS token_transfers
""")

execute_sql_stmt("""
    DROP TABLE IF EXISTS contracts
""")

In [8]:
# Read the token_transfers.csv file into a DataFrame
token_transfers_df = pd.read_csv('token_transfers.csv')

# Create the table in ClickHouse from the token_transfers DataFrame
create_sql = get_create_sql_stmt_from_df(token_transfers_df, 'token_transfers')
execute_sql_stmt(create_sql)

# Insert the data into ClickHouse
clickhouse_client.insert_df('token_transfers', token_transfers_df)

In [9]:
# Read the token_transfers.csv file into a DataFrame
contracts_df = pd.read_csv('contracts.csv')

# Create the table in ClickHouse from the token_transfers DataFrame
create_sql = get_create_sql_stmt_from_df(contracts_df, 'contracts')
execute_sql_stmt(create_sql)

# Insert the data into ClickHouse
clickhouse_client.insert_df('contracts', contracts_df)

## Ad-hoc analysis

How many ERC-20 token contracts were found? (token_address gives the address of the ERC-20 contract)

In [10]:
# Currently there is no erc20 token_address in the token_transfers.csv
# Joining the token_transfers table with the contracts table on the token_address column will return all the erc20 token addresses
erc20_sql = """
    SELECT tt.*, c.is_erc20
    FROM token_transfers tt
    INNER JOIN contracts c ON tt.token_address = c.address
    WHERE c.is_erc20 = True
"""

df = clickhouse_client.query_df(erc20_sql)
df

Current balance for any token address (difference between value in transfers involving the address in to_address and from_address)

In [11]:
token_address = '0xf4eced2f682ce333f96f2d8966c613ded8fc95dd'

current_balance_sql = f"""
    SELECT
        token_address,
        SUM(CASE WHEN from_address = '{token_address}' THEN -value ELSE value END) as balance
    FROM token_transfers
    WHERE token_address = '{token_address}'
        OR from_address = '{token_address}'
        OR to_address = '{token_address}'
    GROUP BY token_address
"""

clickhouse_client.query_df(current_balance_sql)

Unnamed: 0,token_address,balance
0,0xf4eced2f682ce333f96f2d8966c613ded8fc95dd,89400000


Highest transaction in a block

In [12]:
highest_transaction_in_block_sql = """
    SELECT block_number, MAX(value) as highest_transaction
    FROM token_transfers
    GROUP BY block_number
    ORDER BY highest_transaction DESC
"""

clickhouse_client.query_df(highest_transaction_in_block_sql)

Unnamed: 0,block_number,highest_transaction
0,494065,100000000000000
1,492472,100000000000000
2,492475,100000000000000
3,488872,100000000000000
4,487629,100000000000000
5,494937,100000000000000
6,494938,100000000000000
7,488686,100000000000000
8,491355,10100000000000
9,484795,100000000000
