diff --git a/blockchainetl/jobs/exporters/gcs_item_exporter.py b/blockchainetl/jobs/exporters/gcs_item_exporter.py new file mode 100644 index 000000000..288a568cb --- /dev/null +++ b/blockchainetl/jobs/exporters/gcs_item_exporter.py @@ -0,0 +1,111 @@ +# MIT License +# +# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import json +import logging +from collections import defaultdict + +from google.cloud import storage + + +def build_block_bundles(items): + blocks = defaultdict(list) + transactions = defaultdict(list) + logs = defaultdict(list) + token_transfers = defaultdict(list) + traces = defaultdict(list) + for item in items: + item_type = item.get('type') + if item_type == 'block': + blocks[item.get('number')].append(item) + elif item_type == 'transaction': + transactions[item.get('block_number')].append(item) + elif item_type == 'log': + logs[item.get('block_number')].append(item) + elif item_type == 'token_transfer': + token_transfers[item.get('block_number')].append(item) + elif item_type == 'trace': + traces[item.get('block_number')].append(item) + else: + logging.info(f'Skipping item with type {item_type}') + + block_bundles = [] + for block_number in sorted(blocks.keys()): + if len(blocks[block_number]) != 1: + raise ValueError(f'There must be a single block for a given block number, was {len(blocks[block_number])} for block number {block_number}') + block_bundles.append({ + 'block': blocks[block_number][0], + 'transactions': transactions[block_number], + 'logs': logs[block_number], + 'token_transfers': token_transfers[block_number], + 'traces': traces[block_number], + }) + + return block_bundles + + +class GcsItemExporter: + + def __init__( + self, + bucket, + path='blocks', + build_block_bundles_func=build_block_bundles): + self.bucket = bucket + self.path = normalize_path(path) + self.build_block_bundles_func = build_block_bundles_func + self.storage_client = storage.Client() + + def open(self): + pass + + def export_items(self, items): + block_bundles = self.build_block_bundles_func(items) + + for block_bundle in block_bundles: + block = block_bundle.get('block') + if block is None: + raise ValueError('block_bundle must include the block field') + block_number = block.get('number') + if block_number is None: + raise ValueError('block_bundle must include the block.number field') + + destination_blob_name = f'{self.path}/{block_number}.json' + + bucket = self.storage_client.bucket(self.bucket) + blob = bucket.blob(destination_blob_name) + blob.upload_from_string(json.dumps(block_bundle)) + logging.info(f'Uploaded file gs://{self.bucket}/{destination_blob_name}') + + def close(self): + pass + + +def normalize_path(p): + if p is None: + p = '' + if p.startswith('/'): + p = p[1:] + if p.endswith('/'): + p = p[:len(p) - 1] + + return p diff --git a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py index 4d7c313fb..67323371d 100644 --- a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py +++ b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py @@ -29,9 +29,19 @@ class GooglePubSubItemExporter: - def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id', 'item_timestamp')): + def __init__(self, item_type_to_topic_mapping, message_attributes=(), + batch_max_bytes=1024 * 5, batch_max_latency=1, batch_max_messages=1000, + enable_message_ordering=False): self.item_type_to_topic_mapping = item_type_to_topic_mapping - self.publisher = create_publisher() + + self.batch_max_bytes = batch_max_bytes + self.batch_max_latency = batch_max_latency + self.batch_max_messages = batch_max_messages + + self.enable_message_ordering = enable_message_ordering + + self.publisher = self.create_publisher() + self.message_attributes = message_attributes def open(self): @@ -46,7 +56,7 @@ def export_items(self, items): # details = "channel is in state TRANSIENT_FAILURE" # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 logging.info('Recreating Pub/Sub publisher.') - self.publisher = create_publisher() + self.publisher = self.create_publisher() raise e @timeout_decorator.timeout(300) @@ -66,7 +76,8 @@ def export_item(self, item): topic_path = self.item_type_to_topic_mapping.get(item_type) data = json.dumps(item).encode('utf-8') - message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item)) + ordering_key = 'all' if self.enable_message_ordering else '' + message_future = self.publisher.publish(topic_path, data=data, ordering_key=ordering_key, **self.get_message_attributes(item)) return message_future else: logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) @@ -80,15 +91,15 @@ def get_message_attributes(self, item): return attributes - def close(self): - pass - + def create_publisher(self): + batch_settings = pubsub_v1.types.BatchSettings( + max_bytes=self.batch_max_bytes, + max_latency=self.batch_max_latency, + max_messages=self.batch_max_messages, + ) -def create_publisher(): - batch_settings = pubsub_v1.types.BatchSettings( - max_bytes=1024 * 5, # 5 kilobytes - max_latency=1, # 1 second - max_messages=1000, - ) + publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=self.enable_message_ordering) + return pubsub_v1.PublisherClient(batch_settings=batch_settings, publisher_options=publisher_options) - return pubsub_v1.PublisherClient(batch_settings) + def close(self): + pass diff --git a/blockchainetl/jobs/exporters/multi_item_exporter.py b/blockchainetl/jobs/exporters/multi_item_exporter.py new file mode 100644 index 000000000..746280ad6 --- /dev/null +++ b/blockchainetl/jobs/exporters/multi_item_exporter.py @@ -0,0 +1,42 @@ +# MIT License +# +# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +class MultiItemExporter: + def __init__(self, item_exporters): + self.item_exporters = item_exporters + + def open(self): + for exporter in self.item_exporters: + exporter.open() + + def export_items(self, items): + for exporter in self.item_exporters: + exporter.export_items(items) + + def export_item(self, item): + for exporter in self.item_exporters: + exporter.export_item(item) + + def close(self): + for exporter in self.item_exporters: + exporter.close() diff --git a/docs/commands.md b/docs/commands.md index fa7e5c9f5..f5bd98a10 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -207,11 +207,13 @@ You can tune `--batch-size`, `--max-workers` for performance. - This command outputs blocks, transactions, logs, token_transfers to the console by default. - Entity types can be specified with the `-e` option, e.g. `-e block,transaction,log,token_transfer,trace,contract,token`. -- Use `--output` option to specify the Google Pub/Sub topic or Postgres database where to publish blockchain data, +- Use `--output` option to specify the Google Pub/Sub topic, Postgres database or GCS bucket where to publish blockchain data, - For Google PubSub: `--output=projects//topics/crypto_ethereum`. Data will be pushed to `projects//topics/crypto_ethereum.blocks`, `projects//topics/crypto_ethereum.transactions` etc. topics. - For Postgres: `--output=postgresql+pg8000://:@:/`, - e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`. + e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`. + - For GCS: `--output=gs://`. Make sure to install and initialize `gcloud` cli. + - Those output types can be combined with a comma e.g. `--output=gs://,projects//topics/crypto_ethereum` The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema) and [indexes](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/indexes) can be found in this repo [ethereum-etl-postgres](https://github.com/blockchain-etl/ethereum-etl-postgres). diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 7e02fe888..8b4a85fcf 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -27,6 +27,7 @@ from ethereumetl.enumeration.entity_type import EntityType from ethereumetl.providers.auto import get_provider_from_uri +from ethereumetl.streaming.item_exporter_creator import create_item_exporters from ethereumetl.thread_local_proxy import ThreadLocalProxy @@ -67,7 +68,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit streamer_adapter = EthStreamerAdapter( batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), - item_exporter=create_item_exporter(output), + item_exporter=create_item_exporters(output), batch_size=batch_size, max_workers=max_workers, entity_types=entity_types @@ -98,9 +99,7 @@ def parse_entity_types(entity_types): def validate_entity_types(entity_types, output): - from ethereumetl.streaming.item_exporter_creator import determine_item_exporter_type, ItemExporterType - item_exporter_type = determine_item_exporter_type(output) - if item_exporter_type == ItemExporterType.POSTGRES \ + if output is not None and 'postgres' in output \ and (EntityType.CONTRACT in entity_types or EntityType.TOKEN in entity_types): raise ValueError('contract and token are not yet supported entity types for postgres item exporter.') diff --git a/ethereumetl/domain/trace.py b/ethereumetl/domain/trace.py index 85b305d14..c49264816 100644 --- a/ethereumetl/domain/trace.py +++ b/ethereumetl/domain/trace.py @@ -41,3 +41,4 @@ def __init__(self): self.error = None self.status = None self.trace_id = None + self.trace_index = None diff --git a/ethereumetl/jobs/export_traces_job.py b/ethereumetl/jobs/export_traces_job.py index be9d41e17..4ec57f534 100644 --- a/ethereumetl/jobs/export_traces_job.py +++ b/ethereumetl/jobs/export_traces_job.py @@ -96,6 +96,7 @@ def _export_batch(self, block_number_batch): calculate_trace_statuses(all_traces) calculate_trace_ids(all_traces) + calculate_trace_indexes(all_traces) for trace in all_traces: self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace)) @@ -103,3 +104,9 @@ def _export_batch(self, block_number_batch): def _end(self): self.batch_work_executor.shutdown() self.item_exporter.close() + + +def calculate_trace_indexes(traces): + # Only works if traces were originally ordered correctly which is the case for Parity traces + for ind, trace in enumerate(traces): + trace.trace_index = ind diff --git a/ethereumetl/mappers/trace_mapper.py b/ethereumetl/mappers/trace_mapper.py index 6781146c9..4f87ad69c 100644 --- a/ethereumetl/mappers/trace_mapper.py +++ b/ethereumetl/mappers/trace_mapper.py @@ -190,4 +190,5 @@ def trace_to_dict(self, trace): 'error': trace.error, 'status': trace.status, 'trace_id': trace.trace_id, + 'trace_index': trace.trace_index, } diff --git a/ethereumetl/streaming/enrich.py b/ethereumetl/streaming/enrich.py index b120323a9..6f8238805 100644 --- a/ethereumetl/streaming/enrich.py +++ b/ethereumetl/streaming/enrich.py @@ -163,7 +163,8 @@ def enrich_traces(blocks, traces): 'status', 'transaction_hash', 'block_number', - 'trace_id' + 'trace_id', + 'trace_index' ], [ ('timestamp', 'block_timestamp'), diff --git a/ethereumetl/streaming/eth_streamer_adapter.py b/ethereumetl/streaming/eth_streamer_adapter.py index 92810adcf..8ccca7d18 100644 --- a/ethereumetl/streaming/eth_streamer_adapter.py +++ b/ethereumetl/streaming/eth_streamer_adapter.py @@ -87,13 +87,14 @@ def export_all(self, start_block, end_block): logging.info('Exporting with ' + type(self.item_exporter).__name__) - all_items = enriched_blocks + \ - enriched_transactions + \ - enriched_logs + \ - enriched_token_transfers + \ - enriched_traces + \ - enriched_contracts + \ - enriched_tokens + all_items = \ + sort_by(enriched_blocks, 'number') + \ + sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \ + sort_by(enriched_logs, ('block_number', 'log_index')) + \ + sort_by(enriched_token_transfers, ('block_number', 'log_index')) + \ + sort_by(enriched_traces, ('block_number', 'trace_index')) + \ + sort_by(enriched_contracts, ('block_number',)) + \ + sort_by(enriched_tokens, ('block_number',)) self.calculate_item_ids(all_items) self.calculate_item_timestamps(all_items) @@ -219,3 +220,9 @@ def calculate_item_timestamps(self, items): def close(self): self.item_exporter.close() + + +def sort_by(arr, fields): + if isinstance(fields, tuple): + fields = tuple(fields) + return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields)) diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 9827055cd..24fc4e431 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -21,21 +21,36 @@ # SOFTWARE. from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter +from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter + + +def create_item_exporters(outputs): + split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] + + item_exporters = [create_item_exporter(output) for output in split_outputs] + return MultiItemExporter(item_exporters) def create_item_exporter(output): item_exporter_type = determine_item_exporter_type(output) if item_exporter_type == ItemExporterType.PUBSUB: from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter - item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={ - 'block': output + '.blocks', - 'transaction': output + '.transactions', - 'log': output + '.logs', - 'token_transfer': output + '.token_transfers', - 'trace': output + '.traces', - 'contract': output + '.contracts', - 'token': output + '.tokens', - }) + enable_message_ordering = 'sorted' in output or 'ordered' in output + item_exporter = GooglePubSubItemExporter( + item_type_to_topic_mapping={ + 'block': output + '.blocks', + 'transaction': output + '.transactions', + 'log': output + '.logs', + 'token_transfer': output + '.token_transfers', + 'trace': output + '.traces', + 'contract': output + '.contracts', + 'token': output + '.tokens', + }, + message_attributes=('item_id', 'item_timestamp'), + batch_max_bytes=1024 * 1024 * 5, + batch_max_latency=2, + batch_max_messages=1000, + enable_message_ordering=enable_message_ordering) elif item_exporter_type == ItemExporterType.POSTGRES: from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table @@ -54,6 +69,10 @@ def create_item_exporter(output): }, converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(), ListFieldItemConverter('topics', 'topic', fill=4)]) + elif item_exporter_type == ItemExporterType.GCS: + from blockchainetl.jobs.exporters.gcs_item_exporter import GcsItemExporter + bucket, path = get_bucket_and_path_from_gcs_output(output) + item_exporter = GcsItemExporter(bucket=bucket, path=path) elif item_exporter_type == ItemExporterType.CONSOLE: item_exporter = ConsoleItemExporter() else: @@ -62,11 +81,24 @@ def create_item_exporter(output): return item_exporter +def get_bucket_and_path_from_gcs_output(output): + output = output.replace('gs://', '') + bucket_and_path = output.split('/', 1) + bucket = bucket_and_path[0] + if len(bucket_and_path) > 1: + path = bucket_and_path[1] + else: + path = '' + return bucket, path + + def determine_item_exporter_type(output): if output is not None and output.startswith('projects'): return ItemExporterType.PUBSUB elif output is not None and output.startswith('postgresql'): return ItemExporterType.POSTGRES + elif output is not None and output.startswith('gs://'): + return ItemExporterType.GCS elif output is None or output == 'console': return ItemExporterType.CONSOLE else: @@ -76,5 +108,6 @@ def determine_item_exporter_type(output): class ItemExporterType: PUBSUB = 'pubsub' POSTGRES = 'postgres' + GCS = 'gcs' CONSOLE = 'console' UNKNOWN = 'unknown' diff --git a/setup.py b/setup.py index d95ba3823..426ec04b5 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,8 @@ def read(fname): extras_require={ 'streaming': [ 'timeout-decorator==0.4.1', - 'google-cloud-pubsub==0.39.1', + 'google-cloud-pubsub==2.1.0', + 'google-cloud-storage==1.33.0', 'sqlalchemy==1.3.13', 'pg8000==1.13.2', ],