In [1]:
import os
import sys

curr_dir = %pwd

root_path = os.path.dirname(curr_dir)
if root_path not in sys.path:
    print(f"Adding {root_path} to PYTHONPATH")
    sys.path.append(root_path)

assert len(os.environ['PYTHONPATH']) > 0, \
            "Set PYTHONPATH to include this repository prior to use, see README.md"
# assert len(os.environ['GOOGLE_APPLICATION_CREDENTIALS']) > 0, \
#             "Set GOOGLE_APPLICATION_CREDENTIALS prior to use, see README.md"

# Complete Data Proccessing Pipeline

In [2]:
from typing import Optional, List

BQ_TABLE_NAME: str = 'traces'
BUCKET_NAME: str = f'indago'
DOWNLOAD_DIR: str = f'/home/ponbac/dev/indago/data/raw/{BQ_TABLE_NAME}'
# COLUMNS_TO_SAVE: Optional[List[str]] = None 
COLUMNS_TO_SAVE: Optional[List[str]] = ['block_number', 'transaction_index', 'trace_address', 'from_address', 'to_address', 'value', 'gas_used', 'status']
MAX_BLOBS: Optional[int] = 25 # 'None' = Download all blobs to local storage.
# MAX_BLOBS: Optional[int] = None

### BiqQuery Ethereum Table &rarr; Google Cloud Storage Bucket

In [3]:
from utils.bigquery import EthereumBigQuery
from pprint import pprint

query: EthereumBigQuery = EthereumBigQuery()

# Tables available for download:
pprint(query.get_table_names())

['bigquery-public-data.crypto_ethereum.amended_tokens',
 'bigquery-public-data.crypto_ethereum.balances',
 'bigquery-public-data.crypto_ethereum.blocks',
 'bigquery-public-data.crypto_ethereum.contracts',
 'bigquery-public-data.crypto_ethereum.load_metadata',
 'bigquery-public-data.crypto_ethereum.logs',
 'bigquery-public-data.crypto_ethereum.sessions',
 'bigquery-public-data.crypto_ethereum.token_transfers',
 'bigquery-public-data.crypto_ethereum.tokens',
 'bigquery-public-data.crypto_ethereum.traces',
 'bigquery-public-data.crypto_ethereum.transactions']


In [12]:
# Possible to export to json (as_json=True) or parquet (as_parquet=True), csv by default.
query.export_to_bucket(BQ_TABLE_NAME, f'{BUCKET_NAME}/{BQ_TABLE_NAME}')

BadRequest: 400 Operation cannot be performed on a nested schema. Field: withdrawals; reason: invalid, message: Operation cannot be performed on a nested schema. Field: withdrawals

In [24]:
from google.cloud.bigquery import Client, DatasetReference, DestinationFormat, job, QueryJobConfig, ExtractJobConfig
from google.api_core.page_iterator import HTTPIterator

client: Client = Client(
    location="US",
)
dataset_id = "bigquery-public-data.crypto_ethereum"

# Download the "blocks" table
table_id = "blocks"
# bucket: gs://indago/blocks
destination_uri = f"gs://{BUCKET_NAME}/{table_id}"

query_string = f"""SELECT
    timestamp,
    number,
    `hash`,
    parent_hash,
    nonce,
    sha3_uncles,
    logs_bloom,
    transactions_root,
    state_root,
    receipts_root,
    miner,
    difficulty,
    total_difficulty,
    size,
    extra_data,
    gas_limit,
    gas_used,
    transaction_count,
    base_fee_per_gas,
    withdrawals_root
FROM {dataset_id}.{table_id}"""

job_config = QueryJobConfig(
    destination="cryptic-lattice-239701.indago.blocks",
    write_disposition=job.WriteDisposition.WRITE_TRUNCATE,
    # destination_format=DestinationFormat.CSV,
)

query_job = client.query(query_string, job_config=job_config)
query_job.result()

print(f"Exported {table_id} to {destination_uri}")

Exported blocks to gs://indago/blocks


In [30]:
from google.cloud.bigquery import Client, DatasetReference, DestinationFormat, job, QueryJobConfig, ExtractJobConfig

table_id = "cryptic-lattice-239701.indago.blocks"

# Extract the table to GCS
destination_uri = f"gs://{BUCKET_NAME}/blocks/blocks-*.csv"
job_config = ExtractJobConfig()
job_config.destination_format = DestinationFormat.CSV
extract_job = client.extract_table(
    table_id,
    destination_uri,
    job_config=job_config,
)  # Make an API request.
extract_job.result()  # Wait for the job to complete.

print(f"Exported {table_id} to {destination_uri}")


Exported cryptic-lattice-239701.indago.blocks to gs://indago/blocks/blocks-*.csv


### Google Cloud Storage Bucket &rarr; Local Storage

In [3]:
from utils.storage.base import EthereumStorage
from utils.storage.google_cloud_storage import GoogleCloudStorage

storage: EthereumStorage = GoogleCloudStorage()

In [4]:
storage.download(BUCKET_NAME, f'{BQ_TABLE_NAME}/', DOWNLOAD_DIR, MAX_BLOBS, use_cols=COLUMNS_TO_SAVE)

  pd.read_csv(os.path.join(out_dir, blob.name.split('/')[-1]), usecols=use_cols, dtype=dtypes)[use_cols].to_csv(os.path.join(out_dir, blob.name.split('/')[-1]), index=False)
  pd.read_csv(os.path.join(out_dir, blob.name.split('/')[-1]), usecols=use_cols, dtype=dtypes)[use_cols].to_csv(os.path.join(out_dir, blob.name.split('/')[-1]), index=False)
  pd.read_csv(os.path.join(out_dir, blob.name.split('/')[-1]), usecols=use_cols, dtype=dtypes)[use_cols].to_csv(os.path.join(out_dir, blob.name.split('/')[-1]), index=False)
  pd.read_csv(os.path.join(out_dir, blob.name.split('/')[-1]), usecols=use_cols, dtype=dtypes)[use_cols].to_csv(os.path.join(out_dir, blob.name.split('/')[-1]), index=False)
  pd.read_csv(os.path.join(out_dir, blob.name.split('/')[-1]), usecols=use_cols, dtype=dtypes)[use_cols].to_csv(os.path.join(out_dir, blob.name.split('/')[-1]), index=False)
  pd.read_csv(os.path.join(out_dir, blob.name.split('/')[-1]), usecols=use_cols, dtype=dtypes)[use_cols].to_csv(os.path.join(out_d

### Sort & Merge Downloaded Files
Sorted and merged csv will be inside ```DOWNLOAD_DIR/processed/```

In [None]:
import scripts.sort_big_csv as sort_big_csv
import argparse

#args = argparse.Namespace(csv_dir=DOWNLOAD_DIR, merge_only=False, sort_only=False, sort_column='block_number', out_filename=f'{BQ_TABLE_NAME}-sorted.csv')
args = argparse.Namespace(csv_dir=DOWNLOAD_DIR,
    merge_only=False,
    sort_only=False,
    sort_columns=['block_number','transaction_index'],
    out_filename=f'{BQ_TABLE_NAME}-sorted.csv')
#,'trace_address'
sort_big_csv.main(args)