# Transaction Data Collection from the Ethereum Blockchain

## EIP-1559
- Date: August 5, 2021
- Block number: 12,965,000
- [Ethereum JSON-RPC Specification](https://ethereum.github.io/execution-apis/api-documentation/)
- [JSON RPC API](https://ethereum.org/en/developers/docs/apis/json-rpc/)
- [EIP-1559 Analysis Arxiv](https://github.com/SciEcon/EIP1559)

## Layer 2 Solutions Launch Dates
Source: [L2BEAT](https://l2beat.com/scaling/tvl)
1. Optimism is live on: January 16, 2021
2. Arbitrum is live on: August 31, 2021

In [1]:
import datetime
import csv
import os
import re
import glob
from datetime import datetime
import time

import pandas as pd
import dask.dataframe as dd
from collections import defaultdict

In [2]:
def timestamp(date_string):
    """
    Convert a date string to a Unix timestamp.
    
    Args:
        date_string (str): The date string in 'YYYY-MM-DD' format.
    
    Returns:
        int: The Unix timestamp corresponding to the date string.
    """
    dt = datetime.strptime(date_string, "%Y-%m-%d")
    return int(dt.timestamp())


### Merge the eth tx data

In [None]:
# Directory where your files are stored
data_dir = "../data/"

# Filename pattern
filename_pattern = "eth_transaction_data_{}.csv"

# Find all filenames in the directory
all_files = os.listdir(data_dir)

# Extract dates from filenames and convert to datetime
dates = [datetime.strptime(re.search(r'\d{4}-\d{2}-\d{2}', file).group(), "%Y-%m-%d") for file in all_files if re.search(r'\d{4}-\d{2}-\d{2}', file)]

# Find start and end dates
start_date = min(dates)
end_date = max(dates)

print(start_date, end_date)

In this script above:

1. The `os.listdir` function is used to retrieve all the filenames in the directory.
2. The `re.search` function is used to extract the date strings from the filenames using a regular expression that matches the date format (yyyy-mm-dd).
3. The `datetime.strptime` function is used to convert the date strings to datetime objects.
4. The `min` and `max` functions are used to find the start and end dates.

In [None]:
# Flag to indicate whether it's the first file
first_file = True

# Create or open the final CSV file in append mode
with open('../data/merged_eth_transaction_data.csv', 'a') as singleFile:
    for single_date in pd.date_range(start_date, end_date):
        filename = os.path.join(data_dir, filename_pattern.format(single_date.strftime("%Y-%m-%d")))
        
        if os.path.isfile(filename):  # if the file exists
            df = pd.read_csv(filename, dtype={5: float})
            # Write data to file
            if first_file:  # If it's the first file
                df.to_csv(singleFile, header=True)  # Write with header
                first_file = False  # After the first file, set this flag to False
            else:
                df.to_csv(singleFile, header=False, mode='a')  # If not the first file, write without header

In this script for merging transtion data:
1. It iterates over the date range, reads the data for each date into a DataFrame, and appends it to the final CSV file.
2. a flag `first_file` is used to check if the current file is the first one. If it is, the script writes the DataFrame to the CSV file with headers. For subsequent files, the DataFrame is written without headers. The `mode='a'` argument to `to_csv` is used to append the data to the existing file.

### Data preprocessing
Given the large size of your data, traditional Python tools like Pandas might not be able to handle it efficiently due to memory constraints. Fortunately, there are tools and libraries built specifically for handling larger-than-memory datasets, such as Dask and Vaex.

1. **Dask**: Dask is a flexible library for parallel computing in Python that's built on top of existing Python APIs and data structures, like NumPy arrays and Pandas DataFrames. It can handle larger-than-memory computations by breaking them down into smaller tasks, executing these tasks in parallel and combining the results.


2. **Vaex**: Vaex is another library for handling large datasets. Like Dask, it performs lazy evaluations and only reads in data when necessary. Its API is also similar to Pandas, which makes it easy to use if you're familiar with Pandas:

In either case, you should ensure that your machine has enough storage to hold the intermediate results of the computations. Also, both Dask and Vaex make use of multiple cores, so having a multi-core machine can speed up the computations.

In Dask, the way to read and modify columns is similar to pandas. You can use Dask's `read_csv` function to read the CSV file and then apply the string operations to the column names using the `rename` method. Here's how you can adapt your code to use Dask:

In [4]:
# Specify the data types
dtypes = {
    'Transaction Identifier': str,
    'Transaction Status': str,
    'Sender\'s Address': str,
    'Transaction Type': str,
    'Transaction EIP-1559 Type': str
}

# Read the CSV file into a DataFrame
df = dd.read_csv('../data/merged_eth_transaction_data.csv', dtype=dtypes)
print(df.columns)
print(df.dtypes)

# If 'unnamed: 0' column exists, drop it
if 'Unnamed: 0' in df.columns:
    df = df.drop('Unnamed: 0', axis=1)

# Format column names to replace spaces with '_' and replace single quotes with "".
df = df.rename(columns={col: col.lower().replace(' ', '_').replace("'", "") for col in df.columns})

print(df.columns)
print(df.dtypes)

Index(['Unnamed: 0', 'Transaction Identifier', 'Block Number',
       'Transaction Timestamp', 'Transaction Status', 'Gas Price',
       'Transaction Fee', 'Sender's Address', 'Transaction Type',
       'Transaction EIP-1559 Type'],
      dtype='object')
Unnamed: 0                     int64
Transaction Identifier        object
Block Number                   int64
Transaction Timestamp          int64
Transaction Status            object
Gas Price                      int64
Transaction Fee              float64
Sender's Address              object
Transaction Type              object
Transaction EIP-1559 Type     object
dtype: object
Index(['transaction_identifier', 'block_number', 'transaction_timestamp',
       'transaction_status', 'gas_price', 'transaction_fee', 'senders_address',
       'transaction_type', 'transaction_eip-1559_type'],
      dtype='object')
transaction_identifier        object
block_number                   int64
transaction_timestamp          int64
transaction_statu

Dask's `read_csv` function returns a Dask DataFrame, which is a large parallel DataFrame composed of smaller pandas DataFrames, split along the index. These pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster.

One important thing to remember is that Dask uses lazy execution, which means it only executes tasks when absolutely necessary. So if you're not seeing any output or the code seems to be running very quickly, it's likely because Dask hasn't actually performed the computations yet.

To actually perform the computations and get the result, you need to call the `compute` method. However, be careful when using `compute` with large datasets, because it loads the result into memory, which could cause your program to run out of memory if the result is too large. In your case, you won't need to use `compute` until you actually need to see the modified DataFrame.

In [5]:
df.head(10)

Unnamed: 0,transaction_identifier,block_number,transaction_timestamp,transaction_status,gas_price,transaction_fee,senders_address,transaction_type,transaction_eip-1559_type
0,0x804bfa175a2a32b3ed330c7c642ac2210784e1646e39...,11794239,1612501201,Success,269000000000,2.8245e+16,0xb0da6794da4E6f7244B96256AdB8973D07428a20,ERC20 Transfer,Legacy
1,0x2e9c29c7b00fedbf30d0e3f4c4d4039da468b46bdb27...,11794239,1612501201,Success,246015000000,1.833599e+16,0x274F3c32C90517975e29Dfc209a23f315c1e5Fc7,ERC20 Transfer,Legacy
2,0xb78c319c2e68e82be6a0fb34cd49c17e65f1304ea7dc...,11794239,1612501201,Success,209000000000,4389000000000000.0,0xf6292B422EaA44165519480Ebc232eEfB9F0a47f,Simple Ether Transfer,Legacy
3,0x9e26892415c6643b1728a5c6048a2521346092287276...,11794239,1612501201,Success,209000000000,1.463e+17,0xB630D90e3EBE206d2C2A36C060D14812E320862e,Interaction with a Contract,Legacy
4,0xa6798928631b5e101b5dd27d123122b125e8ce5f9a6b...,11794239,1612501201,Success,207000000000,4347000000000000.0,0x741c8EE1d80738dBff66cb914cF74763cc7a0f89,Simple Ether Transfer,Legacy
5,0x16286165e0637f5acbe2cbb2c565e3d54aa463431616...,11794239,1612501201,Success,209000000000,4389000000000000.0,0x741c8EE1d80738dBff66cb914cF74763cc7a0f89,Simple Ether Transfer,Legacy
6,0x9cd1cf12a60507743a89f2c3d367333bc5a89bc51adb...,11794239,1612501201,Success,207000000000,5.175e+16,0x1A579A8FeCE2dcA0E74A12Cc350220178A353D25,ERC20 Transfer,Legacy
7,0x665ff9b7c7b89fe8793eda31aecb7ff01b9723e4b7d3...,11794239,1612501201,Success,207000000000,5.175e+16,0x662F7fEaC9401d403Ddb0972ff85270071009b9F,ERC20 Transfer,Legacy
8,0x20f2c1f815341f6fa822242dc7b2aad5dbe92d1ef474...,11794239,1612501201,Success,207000000000,5.175e+16,0x55ac3377c5B65d03866895f81B241eaF18221A8D,ERC20 Transfer,Legacy
9,0xa9eec9ad774dce95e9395f9787b080437bc081396af2...,11794239,1612501201,Success,207000000000,4347000000000000.0,0x6fAf75112F9Ee5e7A40328F8e64041c4399a2Dc8,Simple Ether Transfer,Legacy


### An alternative approach to merge data from multiple csv files
The script below is more suitable for a smaller number of files or if your machine has a large amount of memory available. We will use this script to merge eth net supply data for each block.

In [18]:
# Set the directory you want to start from
rootDir = '../data/'  # current directory; adjust it if needed

# Find all eth net supply CSV files in the directory
all_files = glob.glob(os.path.join(rootDir, "eth_net_supply_*.csv"))

# Use sort to ensure files are concatenated in the correct order
all_files.sort()

# Use a generator expression to lazily read and concatenate files
merged_data = pd.concat((pd.read_csv(file) for file in all_files))

dtypes = {
    'block_number': 'int',
    'static_reward': 'float',
    'uncle_reward': 'float',
    'inclusion_reward': 'float',
    'base_fee': 'int',
    'gas_used': 'int',
    'burned_fee': 'int',
    'net_supply_increase': 'float'
}

# Convert types for each column
for column, dtype in dtypes.items():
    # Convert the column to numeric values, making non-convertible values NaN
    merged_data[column] = pd.to_numeric(merged_data[column], errors='coerce')

    # Now, drop the rows where we got NaN values due to failed conversion
    merged_data = merged_data[merged_data[column].notna()]

    # And finally, convert the column to the target type
    merged_data[column] = merged_data[column].astype(dtype)

merged_data.to_csv('merged_eth_net_supply.csv', index=False)


  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))
  merged_data = pd.concat((pd.read_csv(file) for file in all_files))


In [19]:
print(merged_data.columns)
print(merged_data.dtypes)

Index(['block_number', 'static_reward', 'uncle_reward', 'inclusion_reward',
       'base_fee', 'gas_used', 'burned_fee', 'net_supply_increase'],
      dtype='object')
block_number             int64
static_reward          float64
uncle_reward           float64
inclusion_reward       float64
base_fee                 int64
gas_used                 int64
burned_fee               int64
net_supply_increase    float64
dtype: object


In [20]:
def extract_unique_senders(input_file, output_file, sender_column):
    """
    Function to extract unique sender addresses from a large CSV file of transaction data using Dask.

    Parameters:
    input_file (str): Path to the input CSV file representing transaction data.
    output_file (str): Path to the output CSV file where unique sender addresses will be saved.
    sender_column (str): The name of the column in the input file that contains the sender's addresses.

    Returns:
    None
    """

    # Read in data using Dask's read_csv function
    # Dask is a parallel computing library that allows us to work with large datasets
    # The read_csv function works similarly to pandas' read_csv, but it performs the operations lazily
    df = dd.read_csv(input_file)

    # Get unique sender's addresses
    # drop_duplicates returns the unique values in the sender_column
    unique_senders = df[sender_column].drop_duplicates()

    # Compute the result and save to a new CSV file
    # compute() performs the actual computation and returns a pandas DataFrame
    # to_csv writes the DataFrame to a CSV file
    unique_senders.compute().to_csv(output_file, index=False)

In [23]:
# Usage
extract_unique_senders('../data/merged_eth_transaction_data.csv', 'unique_senders.csv', 'Sender\'s Address')

In [None]:
def get_unique_senders(data):
    """
    Collect all unique sender addresses from a dataframe of transaction data.
    
    Args:
        data (DataFrame): A dataframe representing transaction data.
    
    Returns:
        DataFrame: A dataframe of unique sender addresses.
    """
    unique_senders = data['Sender\'s Address'].unique()
    return pd.DataFrame(unique_senders, columns=['Sender\'s Address'])

In [None]:
# Get unique sender addresses
unique_senders = get_unique_senders(df)

In [None]:
# If you want to save the unique senders to a CSV file
unique_senders.to_csv('unique_senders.csv', index=False)

In [None]:
# Convert Unix timestamp to pandas Timestamp
df['transaction_timestamp'] = pd.to_datetime(df['transaction_timestamp'], unit='s')

# Sort by user address and timestamp
df.sort_values(['senders_address', 'transaction_timestamp'], inplace=True)

# Add a column for transaction count within the last 12 hours, initialized with 0
df['trans_freq'] = 0

# Initialize the dictionary to store transaction counts
transaction_counts = defaultdict(list)

# Loop over each row in the DataFrame
for i in range(len(df)):
    # Get the current user and transaction timestamp
    current_user = df.iloc[i]['senders_address']
    current_time = df.iloc[i]['transaction_timestamp']

    # Define the 12-hour window start time
    window_start = current_time - pd.Timedelta(hours=12)

    # Store transaction timestamps for each user
    transaction_counts[current_user].append(current_time)

    # Keep only transactions within the 12-hour window
    transaction_counts[current_user] = [ts for ts in transaction_counts[current_user] if ts >= window_start]

    # Set the transaction frequency for the current row to the number of transactions within the time window
    df.at[i, 'trans_freq'] = len(transaction_counts[current_user])


In [None]:
df[:10]

In [None]:
# Convert string to datetime and create a temporary column
df['temp_timestamp'] = pd.to_datetime(df['transaction_timestamp'])

# Set the launch dates for Layer 2 solutions
optimism_launch = datetime(2021, 1, 16)
arbitrum_launch = datetime(2021, 8, 31)

# Add the 'layer2_availability' column
df['layer2_availability'] = ((df['temp_timestamp'] >= arbitrum_launch) | (df['temp_timestamp'] >= optimism_launch)).astype(int)

# Add the 'post_eip1559' column. 
# EIP-1559 went live on August 5, 2021
eip1559_launch_date = datetime(2021, 8, 5)
df['post_eip1559'] = (df['temp_timestamp'] >= eip1559_launch_date).astype(int)

# Drop the temporary column
df = df.drop('temp_timestamp', axis=1)

In [None]:
df[:10]

---
1. **User transaction count**: The cumulative number of transactions made by the same user up to the current transaction.
2. **Failed transactions**: the cumulative number of failed transactions by the same user up to the current transaction

In [None]:
# Create a new column that is 1 if the transaction failed and 0 otherwise
df['is_failed'] = (df['transaction_status'] == 'False').astype(int)

# Group by user address and calculate the cumulative count of transactions and failed transactions
df = df.sort_values('transaction_timestamp')
df['user_transaction_count'] = df.groupby('senders_address').cumcount() + 1
df['failed_transactions'] = df.groupby('senders_address')['is_failed'].cumsum()

# You can drop the 'is_failed' column if you no longer need it
df = df.drop('is_failed', axis=1)
df[:10]

In this script, we first created a new column `is_failed` that is 1 if the transaction failed and 0 otherwise. We then sorted the dataframe by the transaction timestamp to ensure transactions are processed in the order they occurred. 

Next, we grouped the dataframe by the sender's address and used `cumcount` to get the cumulative count of transactions for each user (we added 1 because `cumcount` starts from 0). We also used `cumsum` on the `is_failed` column to get the cumulative count of failed transactions for each user. 

Finally, we dropped the `is_failed` column as it's no longer needed.

---

In [None]:
df.columns

---
Derive User Address Age

We are using 'eth.getTransactionCount' function from web3.py, which retrieves the number of transactions sent from an address up until a certain block. Using this function, we can find the block at which the address was first used.


In the script below, for each transaction in the dataframe, the script scans blocks from the genesis block to the block of the transaction. If the address has made any transactions up to a block, it means that this block is the first usage block of the address, so it's saved to the `first_usage` dictionary. The `get_transaction_count` function is used to get the number of transactions sent from an address up to a certain block. After finding the first usage block of each address, the script adds a new column 'address_age' to the data, which represents the age of the address at the time of each transaction.

The above solution is likely to be slow if you have a large number of unique addresses and transactions. It's because for each address, it scans blocks from the genesis block to the block of each transaction of the address. If you have a large number of unique addresses, the total number of blocks to scan can be enormous.

To optimize the calculation of user address age and utilize a high-performance computing cluster, we could use parallel computing techniques. This involves dividing the computation task into smaller jobs that can be run simultaneously across multiple processors or nodes in the cluster.

In Python, several libraries allow you to use parallel computing, such as `multiprocessing`, `concurrent.futures`, and `joblib`. Here's an example of how you could use the `concurrent.futures` library to parallelize the block scanning task. This script uses a `ThreadPoolExecutor` to run multiple block scanning tasks simultaneously, which should significantly speed up the process if you're running it on a machine with multiple CPU cores. However, the maximum number of concurrent tasks is limited by the number of CPU cores in your machine.

In [None]:
# Function to find the first usage block of an address
def find_first_usage(address, transaction_block):
    # Scan blocks from the start block to the transaction block
    for block in range(start_block, transaction_block):
        # If the address has made any transactions up to the block
        if web3.eth.get_transaction_count(address, block):
            return block
    return None

In [None]:
# Sort by block number
df.sort_values(['block_number'], inplace=True)

# Prepare a dictionary to store the first usage block of each address
first_usage = {}

# Define the range of blocks to scan
start_block = 0  # This should be set to the genesis block

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # For each unique address in the data
    for address in df['senders_address'].unique():
        transaction_block = df[df['senders_address'] == address]['block_number'].min()
        
        # If the first usage block of the address has already been found, skip this address
        if address in first_usage:
            continue
        
        # Submit a new task to the executor
        future = executor.submit(find_first_usage, address, transaction_block)
        
        # Store the Future object in the dictionary
        first_usage[address] = future

# Retrieve the results from the Future objects
for address, future in first_usage.items():
    first_usage[address] = future.result()

# Add a new column 'address_age' to the data
df['address_age'] = df['senders_address'].map(first_usage)

df[:10]

---

**Time of the day variable**

Due to the global nature of Ethereum transactions, the "time of day" variable could have different implications for users in different time zones, and this could indeed introduce complexity and potentially confounding effects into the analysis.

However, there may still be value in including a "time of day" variable, even in a global context, for several reasons:

1. **Network Effects**: Blockchain networks like Ethereum may experience periods of higher and lower congestion, which could align with certain times of day, despite global usage. For example, if a substantial proportion of Ethereum users are based in a particular region (say, North America or East Asia), then the network might be busier during the waking hours of that region.

2. **Market Activity**: Cryptocurrency markets operate 24/7 and market activity (trading volume, price volatility, etc.) can vary significantly across different times of the day, potentially influencing user behavior. For example, users might be more likely to engage in DeFi transactions during periods of high market activity.

3. **Behavioral Patterns**: Regardless of the global nature of Ethereum, there might be common daily behavioral patterns. For example, users might be more active during their daytime and less active during their nighttime, and these patterns could aggregate up to observable patterns in the data.

However, given the global nature, it may be advisable to construct the "time of day" variable in a way that captures potential global effects. For example, you could split the day into fewer, larger chunks (like morning, afternoon, evening, and night), or analyze this variable carefully in your exploratory analysis to understand its distribution and potential impacts.

Another approach might be to construct a variable that captures the "local time of day" for each transaction, assuming you can infer or have information about the geographic location of each user (which could introduce privacy issues and may not be feasible). But again, these are more complex and may not necessarily offer a better representation.

Finally, including "time of day" in your initial model does not obligate you to keep it in your final model. If exploratory analysis or preliminary model results suggest it's not meaningful or is causing issues, you can always exclude it in later iterations of your modeling process.

In conclusion, the "time of day" could potentially be a meaningful variable in your analysis, but its use and interpretation require careful consideration due to the global nature of Ethereum usage.

Given the global nature of Ethereum, a simple division of a day into distinct periods based on a specific time zone may not be the most representative. However, a potential solution could be to divide the day into a number of periods that are likely to capture significant changes in activity. For instance:

1. **Daytime**: 06:00 to 17:59
2. **Evening**: 18:00 to 21:59
3. **Night**: 22:00 to 05:59

This division attempts to capture typical working hours (daytime), after-work hours (evening), and sleeping hours (night). Of course, given the global nature of the network, these periods won't align perfectly with these times for all users, but they might serve as a useful approximation.

If it's possible to incorporate additional information, such as the geographic distribution of Ethereum users or the times of day that tend to see the most network activity, this could be used to refine these periods further.

You can then create a new variable, "TimeOfDay", in your dataset by mapping each transaction timestamp to one of these periods.

Here's a Python script to do that assuming your timestamp is in the form 'YYYY-MM-DD HH:MM:SS' and in UTC:

In [None]:
def assign_time_of_day(timestamp):
    hour = timestamp.hour
    if 6 <= hour < 18:
        return 'Daytime'
    elif 18 <= hour < 22:
        return 'Evening'
    else:
        return 'Night'

df['time_of_day'] = df['transaction_timestamp'].apply(assign_time_of_day)