diff --git a/blockchainetl/jobs/exporters/rabbitmq_item_exporter.py b/blockchainetl/jobs/exporters/rabbitmq_item_exporter.py new file mode 100644 index 000000000..ed379fcb0 --- /dev/null +++ b/blockchainetl/jobs/exporters/rabbitmq_item_exporter.py @@ -0,0 +1,62 @@ +import collections +import json +import logging + +import pika + +from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter + + +class RabbitMQItemExporter: + + def __init__(self, output, item_type_to_queue_mapping, converters=()): + self.item_type_to_queue_mapping = item_type_to_queue_mapping + self.converter = CompositeItemConverter(converters) + self.connection_url = self.get_connection_url(output) + + connection = pika.BlockingConnection(pika.URLParameters("amqp://" + self.connection_url)) + print(self.connection_url) + self.channel = connection.channel() + + for item_type, queue in item_type_to_queue_mapping.items(): + self.channel.queue_declare(queue=queue, durable=True) + + + def get_connection_url(self, output): + try: + return output.split('/')[1] + except KeyError: + raise Exception('Invalid rabbitmq output param, It should be in format of "amqp/guest:guest@localhost:5672"') + + def open(self): + pass + + def export_items(self, items): + for item in items: + self.export_item(item) + + def export_item(self, item): + item_type = item.get('type') + if item_type is not None and item_type in self.item_type_to_queue_mapping: + data = json.dumps(item).encode('utf-8') + logging.debug(data) + return self.channel.basic_publish(exchange='', routing_key=self.item_type_to_queue_mapping[item_type], body=data, properties=pika.BasicProperties( + delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE + )) + else: + logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) + + def convert_items(self, items): + for item in items: + yield self.converter.convert_item(item) + + def close(self): + pass + + +def group_by_item_type(items): + result = collections.defaultdict(list) + for item in items: + result[item.get('type')].append(item) + + return result diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 18bdb22b4..7f5977a98 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -93,6 +93,17 @@ def create_item_exporter(output): 'contract': 'contracts', 'token': 'tokens', }) + elif item_exporter_type == ItemExporterType.RABBITMQ: + from blockchainetl.jobs.exporters.rabbitmq_item_exporter import RabbitMQItemExporter + item_exporter = RabbitMQItemExporter(output, item_type_to_queue_mapping={ + 'block': 'blocks', + 'transaction': 'transactions', + 'log': 'logs', + 'token_transfer': 'token_transfers', + 'trace': 'traces', + 'contract': 'contracts', + 'token': 'tokens', + }) else: raise ValueError('Unable to determine item exporter type for output ' + output) @@ -118,14 +129,16 @@ def determine_item_exporter_type(output): return ItemExporterType.KINESIS if output is not None and output.startswith('kafka'): return ItemExporterType.KAFKA - elif output is not None and output.startswith('postgresql'): + if output is not None and output.startswith('amqp'): + return ItemExporterType.RABBITMQ + if output is not None and output.startswith('postgresql'): return ItemExporterType.POSTGRES - elif output is not None and output.startswith('gs://'): + if output is not None and output.startswith('gs://'): return ItemExporterType.GCS - elif output is None or output == 'console': + if output is None or output == 'console': return ItemExporterType.CONSOLE - else: - return ItemExporterType.UNKNOWN + + return ItemExporterType.UNKNOWN class ItemExporterType: @@ -135,4 +148,5 @@ class ItemExporterType: GCS = 'gcs' CONSOLE = 'console' KAFKA = 'kafka' + RABBITMQ = 'rabbitmq' UNKNOWN = 'unknown' diff --git a/setup.py b/setup.py index b6bb1531f..1692c1c94 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ def read(fname): 'google-cloud-pubsub==2.13.0', 'google-cloud-storage==1.33.0', 'kafka-python==2.0.2', + 'pika==1.3.1', 'sqlalchemy==1.4', 'pg8000==1.16.6', # This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust,