In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, udf
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, LongType
import requests
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import functions as F


# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Start time")
# Constants
INFURA_URLS = [
    ""
]

def get_next_infura_url():
    global infura_url_index
    url = INFURA_URLS[infura_url_index]
    infura_url_index = (infura_url_index + 1) % len(INFURA_URLS)
    
    return url

def get_latest_block_number():
    infura_url = ""
    payload = {
        "jsonrpc": "2.0",
        "method": "eth_blockNumber",
        "params": [],
        "id": 1
    }

    try:
        response = requests.post(infura_url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
        response_json = response.json()
        if 'error' not in response_json:
            return int(response_json['result'], 16)  # Convert hex to int
        else:
            logging.error("Error fetching latest block number: {}".format(response_json['error']))
            return None
    except requests.RequestException as e:
        logging.error("Error making request to fetch latest block number: {}".format(e))
        return None

# BLOCK RANGE
LATEST_BLOCK_NUMBER = get_latest_block_number()
print(LATEST_BLOCK_NUMBER)

STARTING_BLOCK_NUMBER = LATEST_BLOCK_NUMBER -3000 #300b/h

TRANSFER_EVENT_SIGNATURE = ""
infura_url_index = 0

# Schema definition

erc20_transaction_schema = StructType([
    StructField("blockHash", StringType(), True),
    StructField("blockNumber", StringType(), True),
    StructField("from", StringType(), True),
    StructField("gas", StringType(), True),
    StructField("gasPrice", StringType(), True),
    StructField("transactionHash", StringType(), True),
    StructField("input", StringType(), True),
    StructField("nonce", StringType(), True),
    StructField("to", StringType(), True),
    StructField("transactionIndex", StringType(), True),
    StructField("value", StringType(), True),
    StructField("type", StringType(), True),
    StructField("v", StringType(), True),
    StructField("r", StringType(), True),
    StructField("s", StringType(), True),
    StructField("address", StringType(), True),
    StructField("data", StringType(), True),
    StructField("topics", ArrayType(StringType()), True)
])


def ensure_string_integers(data):
    for key, value in data.items():
        if isinstance(value, int):
            data[key] = str(value)

        elif isinstance(value, dict):
            data[key] = ensure_string_integers(value)

        elif isinstance(value, list):
            for idx, item in enumerate(value):
                if isinstance(item, int):
                    value[idx] = str(item)
                elif isinstance(item, dict):
                    value[idx] = ensure_string_integers(item)
    return data


def left_pad_address(address):
    return "0x" + address[2:].rjust(64, '0')


def is_potential_dusting_attack(transaction):
    UNIQUE_RECIPIENT_THRESHOLD = 2
    unique_recipients = set()
    for topic in transaction.get("topics", []):
        if topic[0] == TRANSFER_EVENT_SIGNATURE:
            recipient = topic[2]
            unique_recipients.add(recipient)
            
    return len(unique_recipients) > UNIQUE_RECIPIENT_THRESHOLD

 # Helper function to extract additional fields
def add_additional_fields(transactions):
    for txn in transactions:
        txn["address"] = txn.get("address", None)
        txn["topics"] = txn.get("topics", [])
        txn["data"] = txn.get("data", None)
        txn["transactionHash"] = txn.get("transactionHash", None)
    return transactions

def fetch_transactions_for_address(address):
    transactions = []
    infura_url = get_next_infura_url()
    address = left_pad_address(address)
    #must be splited if an address has too many txs
    block_ranges = [(STARTING_BLOCK_NUMBER, LATEST_BLOCK_NUMBER)]

    for start_block, end_block in block_ranges:
        # First request: address as the sender
        data_from = {
            "jsonrpc": "2.0",            
            "id": 1,
            "method": "eth_getLogs",
            "params": [{
                "fromBlock": hex(start_block),
                "toBlock": hex(end_block),
                "topics": [TRANSFER_EVENT_SIGNATURE, address, None]
            }]
        }
        data_from = ensure_string_integers(data_from)

        # Second request: address as the recipient
        data_to = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "eth_getLogs",
            "params": [{
                "fromBlock": hex(start_block),
                "toBlock": hex(end_block),
                "topics": [TRANSFER_EVENT_SIGNATURE, None, address]
            }]
        }
        data_to = ensure_string_integers(data_to)

       

        # Fetch transactions where address is the sender
        try:
            response = requests.post(infura_url, headers={"Content-Type": "application/json"}, data=json.dumps(data_from))
            response_json = response.json()
            #logging.info(f"Fetched {len(response_json.get('result', []))} transactions FROM address {address}")
            
            if 'error' in response_json:
                logging.error(f"API Error for address {address}: {response_json['error']}")
            else:
                transactions.extend(add_additional_fields(response_json.get('result', [])))
                
        except requests.RequestException as e:
            logging.warning(f"Error fetching transactions FROM address {address}. Error: {e}")

        # Fetch transactions where address is the recipient
        try:
            response = requests.post(infura_url, headers={"Content-Type": "application/json"}, data=json.dumps(data_to))
            response_json = response.json()

            #logging.info(f"Fetched {len(response_json.get('result', []))} transactions TO address {address}")

            if 'error' in response_json:
                logging.error(f"API Error for address {address}: {response_json['error']}")
            else:
                recipient_transactions = add_additional_fields(response_json.get('result', []))
                filtered_transactions = [txn for txn in recipient_transactions if not is_potential_dusting_attack(txn)]
                #logging.info(f"Filtered {len(filtered_transactions)} transactions after dusting attack check")
                transactions.extend(filtered_transactions)
                
        except requests.RequestException as e:
            logging.warning(f"Error fetching transactions TO address {address}. Error: {e}")

    #logging.info(f"Total transactions collected for address {address}: {len(transactions)}")
    return transactions


# Initialize Spark
spark = SparkSession.builder.appName("Ethereum Token Interaction Aggregator").getOrCreate()

# Load CSV and process top holders
df_addresses = spark.read.csv("export-tokenholders.csv", header=True, inferSchema=True)
top_100_holders = [row["HolderAddress"].lower() for row in df_addresses.limit(250).collect()]

# Filter out blacklisted addresses from the top holders
#BT
blacklisted_addresses = [""]
#J
#blacklisted_addresses = [""]
top_100_holders = [address for address in top_100_holders if address not in blacklisted_addresses]

# Flatten the list of transactions and process in Spark

all_transactions = []

with ThreadPoolExecutor(max_workers=10) as executor:
    for transactions in executor.map(fetch_transactions_for_address, top_100_holders):
        all_transactions.extend(transactions)
print(f"Total number of transactions fetched: {len(all_transactions)}")

df_transactions = spark.createDataFrame(all_transactions, schema=erc20_transaction_schema)

# Extract sender, recipient, and amount from topics

def extract_sender(topics):
    return topics[1][-40:] if len(topics) > 1 else None

def extract_recipient(topics):
    return topics[2][-40:] if len(topics) > 2 else None

def extract_amount(data):
    try:
        if isinstance(data, str):
            # Convert the data to an unsigned integer
            amount = int(data, 16)
            return amount
        else:
            return None
    except ValueError as ve:
        logging.error(f"Error in extracting amount. Data: {data}, Error: {ve}")
        return None

    
extract_amount_udf = udf(extract_amount, LongType())

extract_sender_udf = udf(extract_sender, StringType())

extract_recipient_udf = udf(extract_recipient, StringType())


# Add columns for sender, recipient, and amount
df_transactions = df_transactions.withColumn("sender", extract_sender_udf(col("topics")))

df_transactions = df_transactions.withColumn("recipient", extract_recipient_udf(col("topics")))

df_transactions = df_transactions.withColumn("amount", extract_amount_udf(col("data")))

# Remove duplicate rows based on the transactionHash column
df_transactions = df_transactions.dropDuplicates(['transactionHash'])


# Extract token addresses from 'address' field and group by them to get counts

df_tokens = df_transactions.withColumn("token_address", col("address"))

df_token_counts = df_tokens.groupby("token_address").count().sort(col("count").desc())

# Aggregate and join on address to get address_count

address_aggregation = df_transactions.groupBy("address").agg(collect_set("sender").alias("unique_senders"), collect_set("recipient").alias("unique_recipients"))

# merge unique senders and recipients and count them

def merge_and_count(unique_senders, unique_recipients):
    return len(set(unique_senders) | set(unique_recipients))

merge_and_count_udf = udf(merge_and_count, IntegerType())

address_aggregation = address_aggregation.withColumn("address_count", merge_and_count_udf(col("unique_senders"), col("unique_recipients")))

final_df = df_token_counts.join(address_aggregation, df_token_counts.token_address == address_aggregation.address).select("token_address", "count", "address_count")

final_df = final_df.orderBy("count", ascending=False)

In [None]:
# Construct the call data for ERC20 function signature for `name()` 
def fetch_token_name(token_address):
    call_data = "0x06fdde03"
    
    infura_url = get_next_infura_url()
    
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "eth_call",
        "params": [{
            "to": token_address,
            "data": call_data
        }, "latest"]
    }

    try:
        response = requests.post(infura_url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
        response_json = response.json()

        # Convert the hex response to ASCII
        result = bytes.fromhex(response_json["result"][2:]).decode('utf-8').rstrip('\0')
        return result

    except requests.RequestException as e:
        logging.warning(f"Error fetching name for token {token_address}. Error: {e}")
        return None
    
    
def fetch_name(address):
        return (address, fetch_token_name(address))
    
    
def fetch_names_concurrently(token_addresses):
    token_names = {}
    
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_name, address): address for address in token_addresses}
        
        for future in as_completed(futures):
            address = futures[future]
            try:
                token_names[address] = future.result()[1]
            except Exception as e:
                logging.warning(f"Error fetching name for token {address}. Error: {e}")
                token_names[address] = None

    return token_names

unique_token_addresses = final_df.select("token_address").distinct().rdd.flatMap(lambda x: x).collect()
token_names_dict = fetch_names_concurrently(unique_token_addresses)

# UDF to map token addresses to their names
def map_address_to_name(token_address):
    return token_names_dict.get(token_address, None)

map_address_to_name_udf = udf(map_address_to_name, StringType())
final_df = final_df.withColumn("token_name", map_address_to_name_udf(col("token_address")))

# Show the updated DataFrame
#final_df.show(truncate=False)

In [None]:
top_100_holders = [address[2:] if address.startswith("0x") else address for address in top_100_holders]


# Filter the DataFrame
filtered_df = df_transactions.filter(
    df_transactions.sender.isin(top_100_holders) | 
    df_transactions.recipient.isin(top_100_holders)
)

# Count distinct occurrences for sender and recipient for each contract
result = (filtered_df.groupBy("address")
          .agg(F.countDistinct("sender").alias("unique_sender_count"), 
               F.countDistinct("recipient").alias("unique_recipient_count"))
         )

# Sort by unique_recipient_count in descending order
sorted_result = result.sort(F.desc("unique_recipient_count"))

# Add token name
sorted_result_with_names = sorted_result.withColumn("token_name", map_address_to_name_udf(col("address")))

# Display 
#sorted_result_with_names.show(truncate=False)

In [None]:
# ABI-encoded function signature for `totalSupply()= "0x18160ddd"`

def fetch_total_supply(token_address):
    total_supply_signature = "0x18160ddd"
    
    data_payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "eth_call",
        "params": [{
            "to": token_address,
            "data": total_supply_signature
        }, "latest"]
    }
    
    infura_url = get_next_infura_url()
    
    try:
        response = requests.post(infura_url, headers={"Content-Type": "application/json"}, data=json.dumps(data_payload))
        response_json = response.json()
        
        # Decode to get the total supply
        total_supply = int(response_json.get('result', '0x0'), 16)
        return token_address, total_supply
    
    except requests.RequestException as e:
        logging.warning(f"Error fetching total supply for token {token_address}. Error: {e}")
        return token_address, None


def fetch_all_total_supplies(token_addresses):
    total_supplies = {}
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_total_supply, address): address for address in token_addresses}

        for future in as_completed(futures):
            address, total_supply = future.result()
            total_supplies[address] = total_supply

    return total_supplies

# Fetching total supplies for all unique token addresses
unique_token_addresses = final_df.select("token_address").distinct().rdd.flatMap(lambda x: x).collect()
total_supplies_dict = fetch_all_total_supplies(unique_token_addresses)


def map_address_to_supply(token_address):
    return total_supplies_dict.get(token_address, None)

# Fetching total supplies for all unique token addresses
unique_token_addresses = sorted_result_with_names.select("address").distinct().rdd.flatMap(lambda x: x).collect()
total_supplies_dict = fetch_all_total_supplies(unique_token_addresses)

#map token addresses to their supplies
def map_address_to_supply(token_address):
    return total_supplies_dict.get(token_address, None)

map_address_to_supply_udf = udf(map_address_to_supply, StringType())

# Add the total_supply column to the sorted_result_with_names DataFrame
sorted_result_with_names = sorted_result_with_names.withColumn("total_supply", map_address_to_supply_udf(col("address")))

#Show the updated DataFrame with total_supply included
#sorted_result_with_names.show(truncate=False)

In [None]:
# Group by token_address and aggregate sum of amount for incoming and outgoing transactions
agg_sum_df = df_transactions.groupBy("address").agg(
    F.sum(F.when(col("sender").isin(top_100_holders), col("amount")).otherwise(0)).alias("outgoing_sum"),
    F.sum(F.when(col("recipient").isin(top_100_holders), col("amount")).otherwise(0)).alias("incoming_sum")
)

In [None]:
final_df = sorted_result_with_names.join(agg_sum_df, sorted_result_with_names.address == agg_sum_df.address, how="left")

# Drop the duplicate address column if it exists after the join
final_df = final_df.drop(agg_sum_df.address)

# Replace nulls with zeros in the sums 
final_df = final_df.na.fill({'outgoing_sum': 0, 'incoming_sum': 0})

In [None]:
final_df = final_df.withColumn("percent_outgoing", (col("outgoing_sum") / col("total_supply")) * 100)
final_df = final_df.withColumn("percent_incoming", (col("incoming_sum") / col("total_supply")) * 100)

#filter out contracts with only one sender
final_df = final_df.filter(col("unique_sender_count") > 1)

#updated DataFrame
#final_df.show()

final_df = final_df.sort(F.desc("unique_recipient_count"))
# Show the updated DataFrame with percentage columns
final_df = final_df[['unique_sender_count', 'unique_recipient_count', 'address', 'token_name', 'outgoing_sum', 'incoming_sum', 'percent_outgoing', 'percent_incoming']]

In [None]:
pd_final_df = final_df.toPandas()
display(pd_final_df.head(50))

In [None]:
# Display the first 50 rows of the DataFrame without truncating
final_df = final_df[['unique_sender_count', 'unique_recipient_count', 'token_name', 'address']]
final_df.show(50, truncate=False)
