Skip to content

Commit

Permalink
Merge pull request #290 from blockchain-etl/feature/pubsub_message_or…
Browse files Browse the repository at this point in the history
…dering

GCS exporter plus Pub/Sub message ordering
  • Loading branch information
medvedev1088 committed Dec 19, 2021
2 parents e0ca8f9 + 289b900 commit c4c9207
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 38 deletions.
111 changes: 111 additions & 0 deletions blockchainetl/jobs/exporters/gcs_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# MIT License
#
# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import json
import logging
from collections import defaultdict

from google.cloud import storage


def build_block_bundles(items):
blocks = defaultdict(list)
transactions = defaultdict(list)
logs = defaultdict(list)
token_transfers = defaultdict(list)
traces = defaultdict(list)
for item in items:
item_type = item.get('type')
if item_type == 'block':
blocks[item.get('number')].append(item)
elif item_type == 'transaction':
transactions[item.get('block_number')].append(item)
elif item_type == 'log':
logs[item.get('block_number')].append(item)
elif item_type == 'token_transfer':
token_transfers[item.get('block_number')].append(item)
elif item_type == 'trace':
traces[item.get('block_number')].append(item)
else:
logging.info(f'Skipping item with type {item_type}')

block_bundles = []
for block_number in sorted(blocks.keys()):
if len(blocks[block_number]) != 1:
raise ValueError(f'There must be a single block for a given block number, was {len(blocks[block_number])} for block number {block_number}')
block_bundles.append({
'block': blocks[block_number][0],
'transactions': transactions[block_number],
'logs': logs[block_number],
'token_transfers': token_transfers[block_number],
'traces': traces[block_number],
})

return block_bundles


class GcsItemExporter:

def __init__(
self,
bucket,
path='blocks',
build_block_bundles_func=build_block_bundles):
self.bucket = bucket
self.path = normalize_path(path)
self.build_block_bundles_func = build_block_bundles_func
self.storage_client = storage.Client()

def open(self):
pass

def export_items(self, items):
block_bundles = self.build_block_bundles_func(items)

for block_bundle in block_bundles:
block = block_bundle.get('block')
if block is None:
raise ValueError('block_bundle must include the block field')
block_number = block.get('number')
if block_number is None:
raise ValueError('block_bundle must include the block.number field')

destination_blob_name = f'{self.path}/{block_number}.json'

bucket = self.storage_client.bucket(self.bucket)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(json.dumps(block_bundle))
logging.info(f'Uploaded file gs://{self.bucket}/{destination_blob_name}')

def close(self):
pass


def normalize_path(p):
if p is None:
p = ''
if p.startswith('/'):
p = p[1:]
if p.endswith('/'):
p = p[:len(p) - 1]

return p
39 changes: 25 additions & 14 deletions blockchainetl/jobs/exporters/google_pubsub_item_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,19 @@

class GooglePubSubItemExporter:

def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id', 'item_timestamp')):
def __init__(self, item_type_to_topic_mapping, message_attributes=(),
batch_max_bytes=1024 * 5, batch_max_latency=1, batch_max_messages=1000,
enable_message_ordering=False):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.publisher = create_publisher()

self.batch_max_bytes = batch_max_bytes
self.batch_max_latency = batch_max_latency
self.batch_max_messages = batch_max_messages

self.enable_message_ordering = enable_message_ordering

self.publisher = self.create_publisher()

self.message_attributes = message_attributes

def open(self):
Expand All @@ -46,7 +56,7 @@ def export_items(self, items):
# details = "channel is in state TRANSIENT_FAILURE"
# https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606
logging.info('Recreating Pub/Sub publisher.')
self.publisher = create_publisher()
self.publisher = self.create_publisher()
raise e

@timeout_decorator.timeout(300)
Expand All @@ -66,7 +76,8 @@ def export_item(self, item):
topic_path = self.item_type_to_topic_mapping.get(item_type)
data = json.dumps(item).encode('utf-8')

message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item))
ordering_key = 'all' if self.enable_message_ordering else ''
message_future = self.publisher.publish(topic_path, data=data, ordering_key=ordering_key, **self.get_message_attributes(item))
return message_future
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
Expand All @@ -80,15 +91,15 @@ def get_message_attributes(self, item):

return attributes

def close(self):
pass

def create_publisher(self):
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=self.batch_max_bytes,
max_latency=self.batch_max_latency,
max_messages=self.batch_max_messages,
)

def create_publisher():
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024 * 5, # 5 kilobytes
max_latency=1, # 1 second
max_messages=1000,
)
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=self.enable_message_ordering)
return pubsub_v1.PublisherClient(batch_settings=batch_settings, publisher_options=publisher_options)

return pubsub_v1.PublisherClient(batch_settings)
def close(self):
pass
42 changes: 42 additions & 0 deletions blockchainetl/jobs/exporters/multi_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


class MultiItemExporter:
def __init__(self, item_exporters):
self.item_exporters = item_exporters

def open(self):
for exporter in self.item_exporters:
exporter.open()

def export_items(self, items):
for exporter in self.item_exporters:
exporter.export_items(items)

def export_item(self, item):
for exporter in self.item_exporters:
exporter.export_item(item)

def close(self):
for exporter in self.item_exporters:
exporter.close()
6 changes: 4 additions & 2 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,13 @@ You can tune `--batch-size`, `--max-workers` for performance.
- This command outputs blocks, transactions, logs, token_transfers to the console by default.
- Entity types can be specified with the `-e` option,
e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
- Use `--output` option to specify the Google Pub/Sub topic or Postgres database where to publish blockchain data,
- Use `--output` option to specify the Google Pub/Sub topic, Postgres database or GCS bucket where to publish blockchain data,
- For Google PubSub: `--output=projects/<your-project>/topics/crypto_ethereum`.
Data will be pushed to `projects/<your-project>/topics/crypto_ethereum.blocks`, `projects/<your-project>/topics/crypto_ethereum.transactions` etc. topics.
- For Postgres: `--output=postgresql+pg8000://<user>:<password>@<host>:<port>/<database_name>`,
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
- For GCS: `--output=gs://<bucket_name>`. Make sure to install and initialize `gcloud` cli.
- Those output types can be combined with a comma e.g. `--output=gs://<bucket_name>,projects/<your-project>/topics/crypto_ethereum`
The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)
and [indexes](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/indexes) can be found in this
repo [ethereum-etl-postgres](https://github.com/blockchain-etl/ethereum-etl-postgres).
Expand Down
7 changes: 3 additions & 4 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ethereumetl.enumeration.entity_type import EntityType

from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.streaming.item_exporter_creator import create_item_exporters
from ethereumetl.thread_local_proxy import ThreadLocalProxy


Expand Down Expand Up @@ -67,7 +68,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit

streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=create_item_exporter(output),
item_exporter=create_item_exporters(output),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
Expand Down Expand Up @@ -98,9 +99,7 @@ def parse_entity_types(entity_types):


def validate_entity_types(entity_types, output):
from ethereumetl.streaming.item_exporter_creator import determine_item_exporter_type, ItemExporterType
item_exporter_type = determine_item_exporter_type(output)
if item_exporter_type == ItemExporterType.POSTGRES \
if output is not None and 'postgres' in output \
and (EntityType.CONTRACT in entity_types or EntityType.TOKEN in entity_types):
raise ValueError('contract and token are not yet supported entity types for postgres item exporter.')

Expand Down
1 change: 1 addition & 0 deletions ethereumetl/domain/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ def __init__(self):
self.error = None
self.status = None
self.trace_id = None
self.trace_index = None
7 changes: 7 additions & 0 deletions ethereumetl/jobs/export_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,17 @@ def _export_batch(self, block_number_batch):

calculate_trace_statuses(all_traces)
calculate_trace_ids(all_traces)
calculate_trace_indexes(all_traces)

for trace in all_traces:
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))

def _end(self):
self.batch_work_executor.shutdown()
self.item_exporter.close()


def calculate_trace_indexes(traces):
# Only works if traces were originally ordered correctly which is the case for Parity traces
for ind, trace in enumerate(traces):
trace.trace_index = ind
1 change: 1 addition & 0 deletions ethereumetl/mappers/trace_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,5 @@ def trace_to_dict(self, trace):
'error': trace.error,
'status': trace.status,
'trace_id': trace.trace_id,
'trace_index': trace.trace_index,
}
3 changes: 2 additions & 1 deletion ethereumetl/streaming/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def enrich_traces(blocks, traces):
'status',
'transaction_hash',
'block_number',
'trace_id'
'trace_id',
'trace_index'
],
[
('timestamp', 'block_timestamp'),
Expand Down
21 changes: 14 additions & 7 deletions ethereumetl/streaming/eth_streamer_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ def export_all(self, start_block, end_block):

logging.info('Exporting with ' + type(self.item_exporter).__name__)

all_items = enriched_blocks + \
enriched_transactions + \
enriched_logs + \
enriched_token_transfers + \
enriched_traces + \
enriched_contracts + \
enriched_tokens
all_items = \
sort_by(enriched_blocks, 'number') + \
sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \
sort_by(enriched_logs, ('block_number', 'log_index')) + \
sort_by(enriched_token_transfers, ('block_number', 'log_index')) + \
sort_by(enriched_traces, ('block_number', 'trace_index')) + \
sort_by(enriched_contracts, ('block_number',)) + \
sort_by(enriched_tokens, ('block_number',))

self.calculate_item_ids(all_items)
self.calculate_item_timestamps(all_items)
Expand Down Expand Up @@ -219,3 +220,9 @@ def calculate_item_timestamps(self, items):

def close(self):
self.item_exporter.close()


def sort_by(arr, fields):
if isinstance(fields, tuple):
fields = tuple(fields)
return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields))
Loading

0 comments on commit c4c9207

Please sign in to comment.