In [1]:
from web3 import Web3
from concurrent.futures import ThreadPoolExecutor
import json

In [2]:
class Web3Util:
    def __init__(self, provider):
        self.web3_instance = Web3(provider)

    def get_block(self, block_number):
        try:
            block = self.web3_instance.eth.get_block(block_number, True)
            return block
        except Exception as e:
            raise Exception("Could not get information of getBlock") from e
web3Util = Web3Util(Web3.HTTPProvider('https://eth.llamarpc.com'))

In [15]:
#load db.json
with open('db.json') as f:
    db = json.load(f)
last_block = db.get('last_block', 0)
block_step = db.get('block_step', 20)

In [16]:
with ThreadPoolExecutor() as executor:
    blocks = list(executor.map(web3Util.get_block, range(last_block, last_block+ block_step)))
db['last_block'] = last_block + block_step
#save db.json

In [18]:
trx =[]
for block in blocks:
    for transaction in block.transactions:
        trx.append(dict(transaction))
for transaction in trx:
    if "accessList" in transaction:
        del transaction["accessList"]
    transaction['blockHash'] = transaction['blockHash'].hex()
    transaction['hash'] = transaction['hash'].hex()
    transaction['input'] = transaction['input'].hex()
    transaction['r'] = transaction['r'].hex()
    transaction['s'] = transaction['s'].hex()

In [19]:
filename = f'transaction-{last_block}.csv'
import csv
if len(trx):
    with open(f'../hadoop_namenode/{filename}', 'w', newline='') as csvfile:
        fieldnames = list(trx[0].keys())
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for transaction in trx:
            writer.writerow(transaction)
#save db.json
with open('db.json', 'w') as f:
    json.dump(db, f)

In [20]:
import subprocess
container_id = '2a7e7322b681990ff358eb6b5e2d9c51e10bbb89d0110ff4774800ac0057bd4a'
command = f'docker exec {container_id} hdfs dfs -put -f /hadoop/dfs/name/{filename} /'
subprocess.run(command.split(' '))

2024-01-03 17:20:43,508 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


CompletedProcess(args=['docker', 'exec', '2a7e7322b681990ff358eb6b5e2d9c51e10bbb89d0110ff4774800ac0057bd4a', 'hdfs', 'dfs', '-put', '-f', '/hadoop/dfs/name/transaction-18925040.csv', '/'], returncode=0)

In [33]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app').getOrCreate()

df = spark.read.csv('hdfs://localhost:9000/transaction*.csv', header=True, inferSchema=True).cache()

df_fee = df.groupBy('from').sum('gasPrice').withColumnRenamed('sum(gasPrice)', 'fee').orderBy('fee', ascending=False)
df_fee.show(10)
df_fee.repartition(1).write.csv('hdfs://localhost:9000/fee.csv', header=True)
df = spark.read.csv('hdfs://localhost:9000/fee.csv', header=True, inferSchema=True).cache()
df.show(5)


+--------------------+-------------+
|                from|          fee|
+--------------------+-------------+
|0xae2Fc483527B8EF...|3766611205311|
|0x80C1969588bD9a0...|3035136662465|
|0x75e89d5979E4f6F...|1899832305883|
|0x0D0707963952f2f...| 982647695730|
|0x16D5783a96ab20c...| 672470959598|
+--------------------+-------------+
only showing top 5 rows

