# Extract and save Ethereum raw transactions

In [1]:
import os
import json
import requests
import datetime as dt

from web3 import Web3
import pandas as pd

In [2]:
PATH = '../data/raw/'

### Alchemy as the remote node provider

In [3]:
ALCHEMY_KEY = os.environ.get('KEY')
w3 = Web3(Web3.HTTPProvider('https://eth-mainnet.alchemyapi.io/v2/'+ALCHEMY_KEY))

url = 'https://eth-mainnet.alchemyapi.io/v2/'+ALCHEMY_KEY

In [4]:
# latest_block_int = w3.eth.blockNumber
# up_to_block_int = latest_block_int - 5

In [5]:
def extract_first_hist_txns(no_of_blocks, up_to_block_int):
    block_num_request_data = {"jsonrpc": "2.0","id": 0,"method": "eth_getBlockByNumber","params":[hex(up_to_block_int - 1),True]}
    block_json = requests.post(url, json=block_num_request_data).json()
    block_details_list = block_json['result']['transactions']

    # Create lists with the values from the first block
    block_numbers_list = [int(block_json['result']['number'],16)]
    block_transactions_list = [len(block_json['result']['transactions'])]
    block_datetime = dt.datetime.fromtimestamp(int(block_json['result']['timestamp'],16))
    block_datetime_list = [block_datetime.strftime('%Y-%m-%d %H:%M:%S')]

        # While loop
    block_int = up_to_block_int - 1
    while block_int > up_to_block_int - no_of_blocks:
        block_int -= 1
        block_num_request_data = {"jsonrpc": "2.0","id": 0,"method": "eth_getBlockByNumber","params":[hex(block_int),True]}
        block_json = requests.post(url, json=block_num_request_data).json()

        # Get the block number from the first transaction in the block and add to a list
        block_numbers_list.append(int(block_json['result']['number'],16))

        # Get the number of transactions each block and add to a list
        block_transactions_list.append(len(block_json['result']['transactions']))

        # Get the timestamp of each block and add to a list
        block_timestamp = dt.datetime.fromtimestamp(int(block_json['result']['timestamp'],16))
        block_datetime = block_timestamp.strftime('%Y-%m-%d %H:%M:%S')
        block_datetime_list.append(block_datetime)

        # Get transaction details of each block and add to a list
        block_details_list.extend(block_json['result']['transactions'])


    # Create a dictionary with block info
    block_info = {}
    block_info['block_number'] = block_numbers_list
    block_info['block_transactions'] = block_transactions_list
    block_info['block_timestamp'] = block_datetime_list
    
    return block_info, block_details_list

In [6]:
def extract_backward_hist_txns(no_of_blocks):
    
    # Identify the oldest block that was last extracted from block_info_log.json
    up_to_block_int = block_info['block_number'][-1]
    
    
    block_num_request_data = {"jsonrpc": "2.0","id": 0,"method": "eth_getBlockByNumber","params":[hex(up_to_block_int - 1),True]}
    block_json = requests.post(url, json=block_num_request_data).json()
    block_details_list = block_json['result']['transactions']

    
    # Create lists with the values from the first block
    block_numbers_list = [int(block_json['result']['number'],16)]
    block_transactions_list = [len(block_json['result']['transactions'])]
    block_datetime = dt.datetime.fromtimestamp(int(block_json['result']['timestamp'],16))
    block_datetime_list = [block_datetime.strftime('%Y-%m-%d %H:%M:%S')]

    
    # While loop to get n number of blocks but excluding the last extracted block
    block_int = up_to_block_int - 1
    while block_int > up_to_block_int - no_of_blocks:
        block_int -= 1
        block_num_request_data = {"jsonrpc": "2.0","id": 0,"method": "eth_getBlockByNumber","params":[hex(block_int),True]}
        block_json = requests.post(url, json=block_num_request_data).json()

        # Get the block number from the first transaction in the block and add to a list
        block_numbers_list.append(int(block_json['result']['number'],16))

        # Get the number of transactions each block and add to a list
        block_transactions_list.append(len(block_json['result']['transactions']))

        # Get the timestamp of each block and add to a list
        block_timestamp = dt.datetime.fromtimestamp(int(block_json['result']['timestamp'],16))
        block_datetime = block_timestamp.strftime('%Y-%m-%d %H:%M:%S')
        block_datetime_list.append(block_datetime)

        # Get transaction details of each block and add to a list
        block_details_list.extend(block_json['result']['transactions'])
        
    
    # Save the block transactions
    start_block = block_numbers_list[0]
    end_block = block_numbers_list[-1]

    block_details_str = json.dumps(block_details_list)
    with open(f'{PATH}{start_block}_{end_block}_eth_transactions.json', 'w') as f:
        f.write(block_details_str)
        

    # Update block info 
    block_info['block_number'].extend(block_numbers_list)
    block_info['block_transactions'].extend(block_transactions_list)
    block_info['block_timestamp'].extend(block_datetime_list)
    
    
    # Save the block info log as json file
    block_info_str = json.dumps(block_info)
    with open(f'{PATH}block_info_log.json', 'w') as f:
        f.write(block_info_str)

In [7]:
def extract_forward_hist_txns(no_of_blocks):
    
    # Identify the newest block that was last extracted from block_info_log.json
    from_block_int = block_info['block_number'][0]
    
    
    block_num_request_data = {"jsonrpc": "2.0","id": 0,"method": "eth_getBlockByNumber","params":[hex(from_block_int + 1),True]}
    block_json = requests.post(url, json=block_num_request_data).json()
    block_details_list = block_json['result']['transactions']

    
    # Create lists with the values from the first block
    block_numbers_list = [int(block_json['result']['number'],16)]
    block_transactions_list = [len(block_json['result']['transactions'])]
    block_datetime = dt.datetime.fromtimestamp(int(block_json['result']['timestamp'],16))
    block_datetime_list = [block_datetime.strftime('%Y-%m-%d %H:%M:%S')]

    
    # While loop to get n number of blocks forward but exclude the newest extracted block in block info
    block_int = from_block_int + 1
    while block_int < from_block_int + no_of_blocks:
        block_int += 1
        block_num_request_data = {"jsonrpc": "2.0","id": 0,"method": "eth_getBlockByNumber","params":[hex(block_int),True]}
        block_json = requests.post(url, json=block_num_request_data).json()

        # Get the block number from the first transaction in the block and add to a list
        block_numbers_list.append(int(block_json['result']['number'],16))

        # Get the number of transactions each block and add to a list
        block_transactions_list.append(len(block_json['result']['transactions']))

        # Get the timestamp of each block and add to a list
        block_timestamp = dt.datetime.fromtimestamp(int(block_json['result']['timestamp'],16))
        block_datetime = block_timestamp.strftime('%Y-%m-%d %H:%M:%S')
        block_datetime_list.append(block_datetime)

        # Get transaction details of each block and add to a list
        block_details_list.extend(block_json['result']['transactions'])
        
    
    # Save the block transactions
    start_block = block_numbers_list[-1]
    end_block = block_numbers_list[0]

    block_details_str = json.dumps(block_details_list)
    with open(f'{PATH}{start_block}_{end_block}_eth_transactions.json', 'w') as f:
        f.write(block_details_str)
        
    
    # Create a new block info dictionary
    new_block_info = {}
    new_block_info['block_number'] = sorted(block_numbers_list, reverse=True)
    new_block_info['block_transactions'] = sorted(block_transactions_list, reverse=True)
    new_block_info['block_timestamp'] = sorted(block_datetime_list, reverse=True)
    
    # Add existing block info to the new block info so that newest block info is always at the beginning of the list.
    new_block_info['block_number'].extend(block_info['block_number'])
    new_block_info['block_transactions'].extend(block_info['block_transactions'])
    new_block_info['block_timestamp'].extend(block_info['block_timestamp'])
    
    
    # Save the block info log as json file
    block_info_str = json.dumps(new_block_info)
    with open(f'{PATH}block_info_log.json', 'w') as f:
        f.write(block_info_str)

### Read the block_extracted_log

In [8]:
block_info = json.load(open(f'{PATH}block_info_log.json'))

In [9]:
tmp_df = pd.DataFrame(block_info)
tmp_df['block_timestamp'] = tmp_df['block_timestamp'].astype('datetime64[s]')

In [10]:
tmp_df.resample('D', on='block_timestamp').agg({'block_transactions':'sum'})

Unnamed: 0_level_0,block_transactions
block_timestamp,Unnamed: 1_level_1
2022-06-29,698486
2022-06-30,1054433
2022-07-01,1178620
2022-07-02,1189382
2022-07-03,1157052
2022-07-04,1123019
2022-07-05,1157774
2022-07-06,1192050
2022-07-07,1160880
2022-07-08,129959


In [11]:
tmp_df.resample('D', on='block_timestamp').agg({'block_number':'nunique'})

Unnamed: 0_level_0,block_number
block_timestamp,Unnamed: 1_level_1
2022-06-29,3648
2022-06-30,5628
2022-07-01,6455
2022-07-02,6488
2022-07-03,6540
2022-07-04,6429
2022-07-05,6471
2022-07-06,6480
2022-07-07,6419
2022-07-08,442


### While loop to batch extract the BACKWARD historical raw transactions using the Alchemy API

In [None]:
# n = 0
# while n < 10:
#     extract_backward_hist_txns(no_of_blocks=1000)
#     print('Data extracted and saved, batch =', n)
#     n += 1

Data extracted and saved, batch = 0


### While loop to batch extract the FORWARD historical raw transactions using the Alchemy API

In [None]:
n = 0
while n < 10:
    block_info = json.load(open(f'{PATH}block_info_log.json'))
    extract_forward_hist_txns(no_of_blocks=1000)
    print('Data extracted and saved, batch =', n)
    n += 1

Data extracted and saved, batch = 0
Data extracted and saved, batch = 1
Data extracted and saved, batch = 2
Data extracted and saved, batch = 3
Data extracted and saved, batch = 4
Data extracted and saved, batch = 5
Data extracted and saved, batch = 6


### Read the raw extracts into a dataframe

In [4]:
file_dir = os.listdir(PATH)
file_path_list = [os.path.join(PATH, file) for file in file_dir if file.endswith('transactions.json')]

In [5]:
file_list_sorted = sorted(file_path_list, key=os.path.getmtime)

In [6]:
file_path_list.sort()

In [7]:
idx = file_path_list.index('../data/raw/15053949_15052950_eth_transactions.json')

In [8]:
len(file_path_list[9:])

46

#### Manually read multiple json files in batches to save them as separate parquet files

In [None]:
# raw_json_list = [json.load(open(file)) for file in file_path_list[9:24]]
raw_json_list = [json.load(open(file)) for file in file_path_list[24:39]]
# raw_json_list = [json.load(open(file)) for file in file_path_list[39:]]

In [None]:
df_raw = pd.concat([pd.DataFrame(file) for file in raw_json_list])

In [None]:
df_raw.shape

In [None]:
df = df_raw.copy()

### Convert all hexadecimal columns to decimal

In [None]:
%%time
col_list = ['blockNumber','chainId','gas','gasPrice','nonce','nonce','transactionIndex','type','v','value','maxFeePerGas','maxPriorityFeePerGas']

for col in col_list:
    df[col] = df[col].apply(lambda x: int(x, base=16) if type(x) is str else x)

### Convert `type` into category data type

In [None]:
df['type'] = df['type'].astype('category')

### Convert `value` into float64 data type
This allows the dataframe to be saved as parquet otherwise we get the following error:  
"Python int too large to convert to C long"

In [None]:
df['value'] = df['value'].astype('float64')

### Add block timestamp to dataframe

In [None]:
block_info = json.load(open(f'{PATH}block_info_log.json'))
df_block_info = pd.DataFrame(block_info)

# Convert data type to datetime64[s]
df_block_info['block_timestamp'] = df_block_info['block_timestamp'].astype('datetime64[s]')

In [None]:
df = df.merge(df_block_info[['block_number','block_timestamp']], left_on='blockNumber', right_on='block_number')
df.drop(columns='block_number', inplace=True)

### Save the pandas dataframe as parquet file

In [None]:
start_block = df['blockNumber'].min()
end_block = df['blockNumber'].max()

In [None]:
OUT_PATH = '../data/'

In [None]:
df.to_parquet(f'{OUT_PATH}eth_transactions_{start_block}_{end_block}.parquet')