In [None]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


In [None]:
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


In [None]:
OUTPUT_BASE = "s3://rwa-xyz-interview-ethan/results"

In [None]:
eth_logs = "s3a://rwa-xyz-recruiting-data/eth_logs_decoded/"
eth_logs_df = spark.read.parquet(eth_logs)

eth_logs_df.printSchema()

tokens_path = "s3://rwa-xyz-interview-ethan/tokens.csv"
tokens_df = (
    spark.read.option("header", True)
    .csv(tokens_path)
)

tokens_df.show(5)

In [None]:
from pyspark.sql import functions as F, Window
from pyspark.sql.types import DecimalType, StringType
import re

def safe_hex_to_dec(hex_str):
    if not hex_str or hex_str == "0x" or hex_str == "0x0":
        return "0"
    try:
        return str(int(re.sub("^0x", "", hex_str), 16))
    except Exception:
        return None

safe_hex_to_dec_udf = F.udf(safe_hex_to_dec, StringType())

# filter for transfer events 
TRANSFER_SIG = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"

erc20_df = (
    eth_logs_df
    .filter(
        (F.col("TOPIC0") == TRANSFER_SIG) &
        (F.col("DATA").isNotNull()) &
        (F.length("DATA") <= 66)
    )
    .alias("e")
    .join(tokens_df.alias("t"), F.col("e.ADDRESS") == F.col("t.address"), "inner")
)


erc20_df = erc20_df \
    .withColumn("transfer_amount_unscaled_str", safe_hex_to_dec_udf(F.col("DATA"))) \
    .withColumn(
        "transfer_amount",
        (
            (F.col("transfer_amount_unscaled_str").cast("decimal(38,0)") /
            F.pow(10, F.coalesce(F.col("t.decimals"), F.lit(18)))).cast("decimal(38,18)")
        )
    ).select(
    F.col("e.LOG_INDEX").alias("log_index"),
    F.col("e.TRANSACTION_HASH").alias("tx_hash"),
    F.col("e.TRANSACTION_INDEX").alias("tx_index"),
    F.col("e.BLOCK_NUMBER").alias("block_no"),
    F.col("e.BLOCK_HASH").alias("block_hash"),
    F.col("e.ADDRESS").alias("address"),
    F.col("e.BLOCK_TIMESTAMP").alias("block_ts"),
    F.concat(F.lit("0x"), F.substring(F.col("e.TOPIC1"), -40, 40)).alias("sender"),
    F.concat(F.lit("0x"), F.substring(F.col("e.TOPIC2"), -40, 40)).alias("receiver"),
    # Handle 0x or 0x0 cases -> 0, else convert properly
    F.col("transfer_amount"),
    F.col("transfer_amount_unscaled_str"),
    F.col("e.DATA").alias("transfer_amount_raw"),
    F.col("t.symbol").alias("token_symbol"),
    F.col("t.name").alias("token_name")
).persist()

erc20_df.select("tx_hash", "address", "sender", "receiver", "transfer_amount_raw", "transfer_amount_unscaled_str", "transfer_amount").show(50,truncate=False)


In [None]:
# Compute most transferred token
granularity = 'week'
time_df = (
    erc20_df
    .withColumn("period_start", F.date_trunc(granularity, F.col("block_ts")))
)

token_time_agg = (
    time_df
    .groupBy("period_start", "token_symbol", "token_name", "address")
    .agg(F.sum("transfer_amount").alias("total_transfer_amount"))
)

token_time_ranked = (
    token_time_agg
    .withColumn(
        "rank",
        F.row_number().over(
            Window.partitionBy("period_start").orderBy(F.desc("total_transfer_amount"))
        )
    )
)

most_transferred_per_period = (
    token_time_ranked
    .filter(F.col("rank") == 1)
    .select("period_start", "token_symbol", "token_name", "address", "total_transfer_amount")
)

most_transferred_per_period.persist()
most_transferred_per_period.show(50, truncate=False)

In [None]:
# Compute most active sender
daily_df = (
    erc20_df
    .withColumn("event_date", F.to_date("block_ts"))
)

daily_counts = (
    daily_df
    .groupBy("event_date", "sender")
    .agg(F.count("*").alias("daily_transfer_count"))
)

daily_counts = daily_counts.withColumn(
    "event_ts", F.col("event_date").cast("timestamp").cast("long")
)

# Use offset from epoch to account for gaps in transfer activity
window_spec = Window.partitionBy("sender") \
                    .orderBy(F.col("event_ts")) \
                    .rangeBetween(-604800, 0)

rolling_7day = (
    daily_counts
    .withColumn("rolling_7day_count", F.sum("daily_transfer_count").over(window_spec))
)

ranked_senders = (
    rolling_7day
    .withColumn(
        "rank",
        F.row_number().over(
            Window.partitionBy("event_date").orderBy(F.desc("rolling_7day_count"))
        )
    )
)

most_active_sender_per_day_trailing7d = (
    ranked_senders
    .filter(F.col("rank") == 1)
    .select("event_date", "sender", "rolling_7day_count")
)

most_active_sender_per_day_trailing7d.persist()

most_active_sender_per_day_trailing7d.show(50, truncate=False)


In [23]:
erc20_df.unpersist()

most_transferred_per_period.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{OUTPUT_BASE}/most_transferred_per_week")
most_active_sender_per_day_trailing7d.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{OUTPUT_BASE}/most_active_sender_per_day")



