In [3]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DecimalType, DoubleType
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
    .builder \
    .config("fs.s3a.access.key", "AKIASPVPI24RDI7QZUPB") \
    .config("fs.s3a.secret.key", "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") \
    .appName("Preprocessdata") \
    .getOrCreate()

In [None]:
token_transfers_schema = StructType([ \
    StructField("token_address", StringType(), True), \
    StructField("from_address", StringType(), True), \
    StructField("to_address", StringType(), True), \
    StructField("value", DecimalType(38, 0), True), \
    StructField("transaction_hash", StringType(), True), \
    StructField("log_index", LongType(), True), \
    StructField("block_number", LongType(), True) \
  ])

In [None]:
transactions_schema = StructType([ \
    StructField("hash", StringType(), True), \
    StructField("nonce", LongType(), True), \
    StructField("block_hash", StringType(), True), \
    StructField("block_number", LongType(), True), \
    StructField("transaction_index", LongType(), True), \
    StructField("from_address", StringType(), True), \
    StructField("to_address", StringType(), True), \
    StructField("value", DecimalType(38, 0), True), \
    StructField("gas", LongType(), True), \
    StructField("gas_price", LongType(), True), \
    StructField("input", StringType(), True), \
    StructField("block_timestamp", LongType(), True), \
    StructField("max_fee_per_gas", LongType(), True), \
    StructField("max_priority_fee_per_gas", LongType(), True), \
    StructField("transaction_type", LongType(), True) \
  ])

In [None]:
receipts_schema = StructType([ \
    StructField("transaction_hash", StringType(), True), \
    StructField("transaction_index", LongType(), True), \
    StructField("block_hash", StringType(), True), \
    StructField("block_number", LongType(), True), \
    StructField("cumulative_gas_used", LongType(), True), \
    StructField("gas_used", LongType(), True), \
    StructField("contract_address", StringType(), True), \
    StructField("root", StringType(), True), \
    StructField("status", LongType(), True), \
    StructField("effective_gas_price", LongType(), True) \
  ])

In [None]:
tokens_schema = StructType([ \
    StructField("address", StringType(), True), \
    StructField("symbol", StringType(), True), \
    StructField("name", StringType(), True), \
    StructField("decimals", LongType(), True), \
    StructField("total_supply", LongType(), True), \
    StructField("block_number", LongType(), True), \
  ])

In [None]:
cmc_historical_schema = StructType([ \
    StructField("id", LongType(), True), \
    StructField("rank", LongType(), True), \
    StructField("name", StringType(), True), \
    StructField("symbol", StringType(), True), \
    StructField("open", DoubleType(), True), \
    StructField("high", DoubleType(), True), \
    StructField("low", DoubleType(), True), \
    StructField("close", DoubleType(), True), \
    StructField("volume", DoubleType(), True), \
    StructField("marketCap", DoubleType(), True), \
    StructField("timestamp", LongType(), True), \
    StructField("address", StringType(), True), \
  ])

In [None]:
cmc_address_schema = StructType([ \
    StructField("rank", LongType(), True), \
    StructField("bsc", StringType(), True), \
    StructField("eth", StringType(), True), \
    StructField("polygon", StringType(), True), \
  ])

In [None]:
base_path = "s3a://octan-labs-ethereum/export"

In [None]:
token_transfers_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(token_transfers_schema) \
    .load(base_path + "/token_transfers/*/*/*.csv")

In [None]:
transactions_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(transactions_schema) \
    .load(base_path + "/transactions/*/*/*.csv")

In [None]:
receipts_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(receipts_schema) \
    .load(base_path + "/receipts/*/*/*.csv")

In [None]:
tokens_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(tokens_schema) \
    .load(base_path + "/tokens/*.csv")

In [None]:
cmc_historicals_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(cmc_historical_schema) \
    .load(base_path + "/cmc_historicals/*.csv")

In [None]:
cmc_addresses_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(cmc_address_schema) \
    .load(base_path + "/cmc_addresses/*.csv")

In [None]:
from pyspark.sql.functions import col

In [None]:
transactions_df = transactions_df.drop(col("input"))

In [None]:
token_transfers_df.createOrReplaceTempView("token_transfers")
transactions_df.createOrReplaceTempView("transactions")
receipts_df.createOrReplaceTempView("receipts")
tokens_df.createOrReplaceTempView("tokens")
cmc_historicals_df.createOrReplaceTempView("cmc_historicals")
cmc_addresses_df.createOrReplaceTempView("cmc_addresses")

In [None]:
# change ETH, eth if other networks

import time

start_time = time.time()

result_df = spark.sql("""
SELECT 
    nt.block_number,
    nt.from_address,
    nt.to_address,
    nt.value,
    null as token_transfer,
    null as token_contract,
    ((nt.value / POWER(10, 18)) * cmc_h.open) AS volume,
    (r.gas_used * nt.gas_price) / POWER(10,18) as gas_spent,
    ((r.gas_used * nt.gas_price) / POWER(10,18)) * cmc_h.open as gas_spent_usd
FROM (
    SELECT tx.block_number,
          tx.from_address,
          tx.to_address,
          tx.gas,
          tx.gas_price,
          tx.value,
          tx.hash,
          tx.block_timestamp
    FROM transactions tx
) nt
LEFT JOIN receipts r ON nt.hash = r.transaction_hash
CROSS JOIN cmc_addresses cmc_addr
LEFT JOIN cmc_historicals cmc_h ON cmc_addr.rank = cmc_h.rank
WHERE nt.block_timestamp < cmc_h.timestamp AND nt.block_timestamp >  cmc_h.timestamp - 86400 
    AND cmc_h.symbol = 'ETH'
UNION ALL
SELECT 
    tt.block_number,
    tt.from_address,
    tt.to_address,
    null as value,
    tt.value as token_transfer,
    tt.token_address,
    ((tt.value / POWER(10, t.decimals)) * cmc_h.open) AS volume,
    (r.gas_used * t.gas_price) / POWER(10,18) as gas_spent,
    ((r.gas_used * t.gas_price) / POWER(10,18)) * native_cmc_h.open as gas_spent_usd
FROM token_transfers tt
    LEFT JOIN transactions t ON tt.transaction_hash = t.hash
    LEFT JOIN receipts r ON tt.transaction_hash = r.transaction_hash
    LEFT JOIN tokens t ON LOWER(tt.token_address) = LOWER(t.address)
    LEFT JOIN (SELECT eth, rank FROM cmc_addresses GROUP BY eth, rank LIMIT 1) cmc_addr 
        ON tt.token_address = cmc_addr.eth
    LEFT JOIN cmc_historicals cmc_h 
        ON (
            cmc_addr.rank = cmc_h.rank AND 
            t.block_timestamp < cmc_h.timestamp AND 
            t.block_timestamp >  cmc_h.timestamp - 8640
        )
    LEFT JOIN (
        SELECT * FROM cmc_addresses
        LEFT JOIN cmc_historicals ON cmc_addresses.rank = cmc_historicals.rank
        ) native_cmc_h 
            ON native_cmc_h.symbol = 'ETH'
                AND t.block_timestamp < native_cmc_h.timestamp 
                AND t.block_timestamp >  native_cmc_h.timestamp - 86400
""").withColumn('volume', col('volume').cast(DecimalType(38, 10))) \
    .withColumn('gas_spent', col('gas_spent').cast(DecimalType(38, 10))) \
    .withColumn('gas_spent_usd', col('gas_spent_usd').cast(DecimalType(38, 10))) 


time.time() - start_time

In [None]:
result_df.show(10, False)

In [None]:
result_df.count()

In [None]:
start_time = time.time()

result_df.repartition(1) \
    .write \
    .option("header",True) \
    .csv("s3a://octan-labs-ethereum/pre_transaction/ethereum_16000000_17059999")

time.time() - start_time