Skip to content

Commit

Permalink
Merge pull request #208 from obsh/develop
Browse files Browse the repository at this point in the history
Add block_timestamp attribute to exported PubSub message
  • Loading branch information
medvedev1088 committed Apr 16, 2020
2 parents e0636bb + dec070e commit eeabd57
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 18 deletions.
4 changes: 2 additions & 2 deletions blockchainetl/jobs/exporters/google_pubsub_item_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class GooglePubSubItemExporter:

def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id',)):
def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id', 'item_timestamp')):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.publisher = create_publisher()
self.message_attributes = message_attributes
Expand Down Expand Up @@ -76,7 +76,7 @@ def get_message_attributes(self, item):

for attr_name in self.message_attributes:
if item.get(attr_name) is not None:
attributes[attr_name] = item.get(attr_name)
attributes[attr_name] = str(item.get(attr_name))

return attributes

Expand Down
46 changes: 46 additions & 0 deletions ethereumetl/streaming/eth_item_timestamp_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import json
import logging
from datetime import datetime


class EthItemTimestampCalculator:
def calculate(self, item):
if item is None or not isinstance(item, dict):
return None

item_type = item.get('type')

if item_type == 'block' and item.get('timestamp') is not None:
return epoch_seconds_to_rfc3339(item.get('timestamp'))
elif item.get('block_timestamp') is not None:
return epoch_seconds_to_rfc3339(item.get('block_timestamp'))

logging.warning('item_timestamp for item {} is None'.format(json.dumps(item)))

return None


def epoch_seconds_to_rfc3339(timestamp):
return datetime.utcfromtimestamp(int(timestamp)).isoformat() + 'Z'
7 changes: 7 additions & 0 deletions ethereumetl/streaming/eth_streamer_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ethereumetl.streaming.enrich import enrich_transactions, enrich_logs, enrich_token_transfers, enrich_traces, \
enrich_contracts, enrich_tokens
from ethereumetl.streaming.eth_item_id_calculator import EthItemIdCalculator
from ethereumetl.streaming.eth_item_timestamp_calculator import EthItemTimestampCalculator
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from web3 import Web3

Expand All @@ -30,6 +31,7 @@ def __init__(
self.max_workers = max_workers
self.entity_types = entity_types
self.item_id_calculator = EthItemIdCalculator()
self.item_timestamp_calculator = EthItemTimestampCalculator()

def open(self):
self.item_exporter.open()
Expand Down Expand Up @@ -94,6 +96,7 @@ def export_all(self, start_block, end_block):
enriched_tokens

self.calculate_item_ids(all_items)
self.calculate_item_timestamps(all_items)

self.item_exporter.export_items(all_items)

Expand Down Expand Up @@ -210,5 +213,9 @@ def calculate_item_ids(self, items):
for item in items:
item['item_id'] = self.item_id_calculator.calculate(item)

def calculate_item_timestamps(self, items):
for item in items:
item['item_timestamp'] = self.item_timestamp_calculator.calculate(item)

def close(self):
self.item_exporter.close()
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"type": "block", "number": 1755634, "hash": "0xa06fc36a7144c4bbb1f7ab13b541144414fa7808c119e8a4635e392ea544c178", "parent_hash": "0x112aa801c14d16d9b929fd8e1e639a29d79e467334054a111c2f860e462e3ff4", "nonce": "0x803fc62205a3b6bb", "sha3_uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", "logs_bloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "transactions_root": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", "state_root": "0x596d5e71f1cbdd30a2111d3b04fabf3390baedc15d3a582b8b0469c508ce30b3", "receipts_root": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", "miner": "0x61c808d82a3ac53231750dadc13c777b59310bd9", "difficulty": 52927024787647, "total_difficulty": 30078010444197187424, "size": 532, "extra_data": "0xe4b883e5bda9e7a59ee4bb99e9b1bc", "gas_limit": 4712388, "gas_used": 0, "timestamp": 1466669557, "transaction_count": 0, "item_id": "block_0xa06fc36a7144c4bbb1f7ab13b541144414fa7808c119e8a4635e392ea544c178"}
{"type": "block", "number": 1755635, "hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "parent_hash": "0xa06fc36a7144c4bbb1f7ab13b541144414fa7808c119e8a4635e392ea544c178", "nonce": "0xfdaeb738c5a4ef9c", "sha3_uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", "logs_bloom": "0x00000000000000020000000200020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000008000000000000000000000000000000000000000000000000000000000000400001000000040000000000000020000010000000000000000000000000000000000000000000000000000000100000020000001010000000000000000000000000000000000000002000000000000000100000000000000002000000000000000000000200000000000000008000000000000000000000000000000000000000000000001000000002000000000000000000000000", "transactions_root": "0x36854e7b5bedd028c7b2a89829f6269a76e345edc92c6bffaddfdace512a2818", "state_root": "0x7584c6224f7373d95dd2fcb9a29135271b64edf1f785def4f51c446cd4675dc3", "receipts_root": "0xda8b99b6658dbcb71a03a6978d61d5ad6ad3e74961b58fe71e6e75549c701e1a", "miner": "0xa027231f42c80ca4125b5cb962a21cd4f812e88f", "difficulty": 52952868094237, "total_difficulty": 30078063397065281661, "size": 783, "extra_data": "0x6574682e70702e7561", "gas_limit": 4712388, "gas_used": 57418, "timestamp": 1466669562, "transaction_count": 2, "item_id": "block_0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67"}
{"type": "block", "number": 1755634, "hash": "0xa06fc36a7144c4bbb1f7ab13b541144414fa7808c119e8a4635e392ea544c178", "parent_hash": "0x112aa801c14d16d9b929fd8e1e639a29d79e467334054a111c2f860e462e3ff4", "nonce": "0x803fc62205a3b6bb", "sha3_uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", "logs_bloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "transactions_root": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", "state_root": "0x596d5e71f1cbdd30a2111d3b04fabf3390baedc15d3a582b8b0469c508ce30b3", "receipts_root": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", "miner": "0x61c808d82a3ac53231750dadc13c777b59310bd9", "difficulty": 52927024787647, "total_difficulty": 30078010444197187424, "size": 532, "extra_data": "0xe4b883e5bda9e7a59ee4bb99e9b1bc", "gas_limit": 4712388, "gas_used": 0, "timestamp": 1466669557, "transaction_count": 0, "item_id": "block_0xa06fc36a7144c4bbb1f7ab13b541144414fa7808c119e8a4635e392ea544c178", "item_timestamp": "2016-06-23T08:12:37Z"}
{"type": "block", "number": 1755635, "hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "parent_hash": "0xa06fc36a7144c4bbb1f7ab13b541144414fa7808c119e8a4635e392ea544c178", "nonce": "0xfdaeb738c5a4ef9c", "sha3_uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", "logs_bloom": "0x00000000000000020000000200020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000008000000000000000000000000000000000000000000000000000000000000400001000000040000000000000020000010000000000000000000000000000000000000000000000000000000100000020000001010000000000000000000000000000000000000002000000000000000100000000000000002000000000000000000000200000000000000008000000000000000000000000000000000000000000000001000000002000000000000000000000000", "transactions_root": "0x36854e7b5bedd028c7b2a89829f6269a76e345edc92c6bffaddfdace512a2818", "state_root": "0x7584c6224f7373d95dd2fcb9a29135271b64edf1f785def4f51c446cd4675dc3", "receipts_root": "0xda8b99b6658dbcb71a03a6978d61d5ad6ad3e74961b58fe71e6e75549c701e1a", "miner": "0xa027231f42c80ca4125b5cb962a21cd4f812e88f", "difficulty": 52952868094237, "total_difficulty": 30078063397065281661, "size": 783, "extra_data": "0x6574682e70702e7561", "gas_limit": 4712388, "gas_used": 57418, "timestamp": 1466669562, "transaction_count": 2, "item_id": "block_0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_timestamp": "2016-06-23T08:12:42Z"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"type": "log", "log_index": 0, "transaction_hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "transaction_index": 0, "address": "0xbb9bc244d798123fde783fcc1c72d3bb8c189413", "data": "0x0000000000000000000000000000000000000000000000004563918244f40000", "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "0x0000000000000000000000006498077292a0921c8804924fdf47b5e91e2a215f", "0x0000000000000000000000008b3b3b624c3c0397d3da8fd861512393d51dcbac"], "block_number": 1755635, "block_timestamp": 1466669562, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_id": "log_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d_0"}
{"type": "log", "log_index": 1, "transaction_hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "transaction_index": 0, "address": "0x8b3b3b624c3c0397d3da8fd861512393d51dcbac", "data": "0x", "topics": ["0xe3e6ac9b8af8d4194beda053cf95abee2ac870c4fb5f26505181ef1d438512bf", "0x0000000000000000000000006498077292a0921c8804924fdf47b5e91e2a215f", "0x0000000000000000000000000000000000000000000000004563918244f40000"], "block_number": 1755635, "block_timestamp": 1466669562, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_id": "log_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d_1"}
{"type": "log", "log_index": 0, "transaction_hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "transaction_index": 0, "address": "0xbb9bc244d798123fde783fcc1c72d3bb8c189413", "data": "0x0000000000000000000000000000000000000000000000004563918244f40000", "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "0x0000000000000000000000006498077292a0921c8804924fdf47b5e91e2a215f", "0x0000000000000000000000008b3b3b624c3c0397d3da8fd861512393d51dcbac"], "block_number": 1755635, "block_timestamp": 1466669562, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_id": "log_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d_0", "item_timestamp": "2016-06-23T08:12:42Z"}
{"type": "log", "log_index": 1, "transaction_hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "transaction_index": 0, "address": "0x8b3b3b624c3c0397d3da8fd861512393d51dcbac", "data": "0x", "topics": ["0xe3e6ac9b8af8d4194beda053cf95abee2ac870c4fb5f26505181ef1d438512bf", "0x0000000000000000000000006498077292a0921c8804924fdf47b5e91e2a215f", "0x0000000000000000000000000000000000000000000000004563918244f40000"], "block_number": 1755635, "block_timestamp": 1466669562, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_id": "log_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d_1", "item_timestamp": "2016-06-23T08:12:42Z"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"type": "token_transfer", "token_address": "0xbb9bc244d798123fde783fcc1c72d3bb8c189413", "from_address": "0x6498077292a0921c8804924fdf47b5e91e2a215f", "to_address": "0x8b3b3b624c3c0397d3da8fd861512393d51dcbac", "value": 5000000000000000000, "transaction_hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "log_index": 0, "block_number": 1755635, "block_timestamp": 1466669562, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_id": "token_transfer_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d_0"}
{"type": "token_transfer", "token_address": "0xbb9bc244d798123fde783fcc1c72d3bb8c189413", "from_address": "0x6498077292a0921c8804924fdf47b5e91e2a215f", "to_address": "0x8b3b3b624c3c0397d3da8fd861512393d51dcbac", "value": 5000000000000000000, "transaction_hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "log_index": 0, "block_number": 1755635, "block_timestamp": 1466669562, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "item_id": "token_transfer_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d_0", "item_timestamp": "2016-06-23T08:12:42Z"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"type": "transaction", "hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "nonce": 34160, "transaction_index": 0, "from_address": "0xed059bc543141c8c93031d545079b3da0233b27f", "to_address": "0x8b3b3b624c3c0397d3da8fd861512393d51dcbac", "value": 0, "gas": 250000, "gas_price": 20000000000, "input": "0x7edae70f0000000000000000000000006498077292a0921c8804924fdf47b5e91e2a215f", "block_timestamp": 1466669562, "block_number": 1755635, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "receipt_cumulative_gas_used": 36418, "receipt_gas_used": 36418, "receipt_contract_address": null, "receipt_root": "0x4db3f06ff4e7283ab1187045ca78f4bd713b0737640180377b4d4a2e9a80c235", "receipt_status": null, "item_id": "transaction_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d"}
{"type": "transaction", "hash": "0x9a5437ec71b74ecf5930b406908ac6999966d38a86d1534b7190ece7599095eb", "nonce": 3745, "transaction_index": 1, "from_address": "0x3763e6e1228bfeab94191c856412d1bb0a8e6996", "to_address": "0xec1ebac9da3430213281c80fa6d46378341a96ae", "value": 405738107000000000, "gas": 90000, "gas_price": 20000000000, "input": "0x", "block_timestamp": 1466669562, "block_number": 1755635, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "receipt_cumulative_gas_used": 57418, "receipt_gas_used": 21000, "receipt_contract_address": null, "receipt_root": "0x379f143510d5703cf162e37e61d906341b4a6acf4f339d422c656000ccd5898f", "receipt_status": null, "item_id": "transaction_0x9a5437ec71b74ecf5930b406908ac6999966d38a86d1534b7190ece7599095eb"}
{"type": "transaction", "hash": "0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "nonce": 34160, "transaction_index": 0, "from_address": "0xed059bc543141c8c93031d545079b3da0233b27f", "to_address": "0x8b3b3b624c3c0397d3da8fd861512393d51dcbac", "value": 0, "gas": 250000, "gas_price": 20000000000, "input": "0x7edae70f0000000000000000000000006498077292a0921c8804924fdf47b5e91e2a215f", "block_timestamp": 1466669562, "block_number": 1755635, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "receipt_cumulative_gas_used": 36418, "receipt_gas_used": 36418, "receipt_contract_address": null, "receipt_root": "0x4db3f06ff4e7283ab1187045ca78f4bd713b0737640180377b4d4a2e9a80c235", "receipt_status": null, "item_id": "transaction_0x2e3dcd051a91d3a694f6b8de2ac4b5fe7acdba55f58bcf8471ff00d4a430074d", "item_timestamp": "2016-06-23T08:12:42Z"}
{"type": "transaction", "hash": "0x9a5437ec71b74ecf5930b406908ac6999966d38a86d1534b7190ece7599095eb", "nonce": 3745, "transaction_index": 1, "from_address": "0x3763e6e1228bfeab94191c856412d1bb0a8e6996", "to_address": "0xec1ebac9da3430213281c80fa6d46378341a96ae", "value": 405738107000000000, "gas": 90000, "gas_price": 20000000000, "input": "0x", "block_timestamp": 1466669562, "block_number": 1755635, "block_hash": "0x1dec87ec1ba8e65b7773bb6f62249468948a28a427efd3d896a2ff7d7c591a67", "receipt_cumulative_gas_used": 57418, "receipt_gas_used": 21000, "receipt_contract_address": null, "receipt_root": "0x379f143510d5703cf162e37e61d906341b4a6acf4f339d422c656000ccd5898f", "receipt_status": null, "item_id": "transaction_0x9a5437ec71b74ecf5930b406908ac6999966d38a86d1534b7190ece7599095eb", "item_timestamp": "2016-06-23T08:12:42Z"}
Loading

0 comments on commit eeabd57

Please sign in to comment.