Skip to content

Commit

Permalink
Add event listener module for blockchain operations
Browse files Browse the repository at this point in the history
  • Loading branch information
emre committed Sep 13, 2018
1 parent 814973e commit 0ea085d
Show file tree
Hide file tree
Showing 4 changed files with 1,271 additions and 39 deletions.
70 changes: 69 additions & 1 deletion docs/helpers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,72 @@ A simple class to convert "1234.1234 STEEM" kind of values to Decimal.
amount = Amount("42.5466 STEEM")
print(amount.amount)
print(amount.symbol)
print(amount.symbol)
EventListener Helper
=================================

EventListener is a helper class to listen specific operations (events) on the
blockchain.

**Stream blockchain for the incoming transfers related to a specific account**

.. code-block:: python
from lightsteem.helpers.event_listener import EventListener
from lightsteem.client import Client
client = Client()
events = EventListener(client)
for transfer in events.on('transfer', filter_by={"to": "emrebeyler"}):
print(transfer)
**Stream for incoming vote actions**

.. code-block:: python
events = EventListener(client)
for witness_vote in events.on('account_witness_vote', filter_by={"witness": "emrebeyler"}):
print(witness_vote)
**Conditions via callables**

Stream for the comments and posts tagged with utopian-io.

.. code-block:: python
from lightsteem.client import Client
from lightsteem.helpers.event_listener import EventListener
import json
c = Client()
events = EventListener(c)
def filter_tags(comment_body):
if not comment_body.get("json_metadata"):
return False
try:
tags = json.loads(comment_body["json_metadata"])["tags"]
except KeyError:
return False
return "utopian-io" in tags
for op in events.on("comment", condition=filter_tags):
print(op)
EventListener class also has

- start_block
- end_block

params that you can limit the streaming process into specific blocks.


95 changes: 95 additions & 0 deletions lightsteem/helpers/event_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import time


class TransactionListener:

def __init__(self, client, blockchain_mode=None,
start_block=None, end_block=None,
only_ops=True):
self.client = client
self.blockchain_mode = blockchain_mode or "irreversible"
self.start_block = start_block
self.end_block = end_block
self.only_ops = only_ops

def get_last_block_height(self):
props = self.client.get_dynamic_global_properties()
if self.blockchain_mode == "irreversible":
return props['last_irreversible_block_num']
elif self.blockchain_mode == "head":
return props['head_block_number']
else:
raise ValueError(
"Invalid blockchain mode. It can be irreversible or head.")

def get_ops(self, block_num):
self.client.logger.info("Getting ops on %s", block_num)
return block_num, self.client.get_ops_in_block(block_num, False)

def get_block(self, block_num):
self.client.logger.info("Getting block: %s", block_num)
block_data = self.client.get_block(block_num)
return block_data

def listen(self, ops=True):
current_block = self.start_block
if not current_block:
current_block = self.get_last_block_height()
while True:
while (self.get_last_block_height() - current_block) > 0:
if self.end_block and current_block > self.end_block:
return
if ops:
block_num, ops = self.get_ops(current_block)
for op in ops:
yield op
else:
yield self.get_block(current_block)

current_block += 1

time.sleep(3)

def listen_blocks(self):
return self.listen(ops=False)


class EventListener:

def __init__(self, client, blockchain_mode=None,
start_block=None, end_block=None):
self.client = client
self.transaction_listener = TransactionListener(
self.client,
blockchain_mode=blockchain_mode,
start_block=start_block,
end_block=end_block,
)

def on(self, op_type, filter_by=None, condition=None):

# magically turn the op_type to a list if it's a single string.
op_types = op_type if isinstance(op_type, list) else [op_type, ]
for op_data in self.transaction_listener.listen():
operation_type, operation_value = op_data["op"][0:2]
if operation_type not in op_types:
continue

# filter_by is a generic dict that can be changed on every op.
if filter_by and not filter_by.items() <= operation_value.items():
continue

# condition result should be True, otherwise continue
# and search for other operations.
if condition and not condition(operation_value):
continue

yield op_data

def stream_operations(self):
for op_data in self.transaction_listener.listen():
yield op_data

def stream_blocks(self):
for block in self.transaction_listener.listen_blocks():
yield block
108 changes: 70 additions & 38 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,15 @@
import unittest
import datetime
import json
import unittest

import requests_mock

import lightsteem.exceptions
from lightsteem.client import Client
from lightsteem.helpers.account import Account

import lightsteem.exceptions

mock_history_max_index = [[3, {
'trx_id': '0000000000000000000000000000000000000000', 'block': 25641925,
'trx_in_block': 4294967295, 'op_in_trx': 0, 'virtual_op': 7,
'timestamp': '2018-09-03T17:24:48', 'op': ['fill_vesting_withdraw',
{'from_account': 'hellosteem',
'to_account': 'hellosteem',
'withdrawn': '187.37083 VESTS',
'deposited': '0.092 STEEM'}]}]]


mock_history = [[1,
{'trx_id': '985bb048e2068cdb311829ad3d76f4dc2947811a',
'block': 25153549, 'trx_in_block': 1, 'op_in_trx': 0,
'virtual_op': 0, 'timestamp': '2018-08-17T18:12:57',
'op': ['transfer',
{'from': 'hellosteem', 'to': 'sekhmet',
'amount': '0.001 STEEM', 'memo': ''}]}],
[2,
{'trx_id': 'bb1b6ddf13bcffe5bba8d55c3c37a5c672ff7309',
'block': 25153549, 'trx_in_block': 0, 'op_in_trx': 0,
'virtual_op': 0, 'timestamp': '2018-08-17T18:12:57',
'op': ['limit_order_create',
{'owner': 'hellosteem', 'orderid': 1534529209,
'amount_to_sell': '0.100 STEEM',
'min_to_receive': '100.000 SBD',
'fill_or_kill': False,
'expiration': '1969-12-31T23:59:59'}]}],
[3,
{'trx_id': '851c9a4ec9a32855b9981ea6b97c7911abaf8996',
'block': 25153418, 'trx_in_block': 0, 'op_in_trx': 0,
'virtual_op': 0, 'timestamp': '2018-08-17T18:06:24',
'op': ['transfer',
{'from': 'hellosteem', 'to': 'fabien',
'amount': '0.001 STEEM', 'memo': 'Test'}]}]]
from lightsteem.helpers.event_listener import EventListener
from tests_mockdata import mock_block_25926363, mock_dygp_result, \
mock_block_25926364, mock_history, mock_history_max_index


class TestClient(unittest.TestCase):
Expand Down Expand Up @@ -259,5 +226,70 @@ def match_non_max_index_request(request):
self.assertEqual(3, history[0][0])


class TestEventListener(unittest.TestCase):

def setUp(self):
self.client = Client(nodes=TestClient.NODES)

def test_filtering(self):

def match_dygp(request):
params = json.loads(request.text)
return 'get_dynamic_global_properties' in params["method"]

def match_block_25926363(request):
return '25926363' in request.text

def match_block_25926364(request):
return '25926364' in request.text

with requests_mock.mock() as m:
m.post(TestClient.NODES[0], json=mock_dygp_result,
additional_matcher=match_dygp)
m.post(TestClient.NODES[0], json=mock_block_25926363,
additional_matcher=match_block_25926363)
m.post(TestClient.NODES[0], json=mock_block_25926364,
additional_matcher=match_block_25926364)
events = EventListener(
self.client,
start_block=25926363,
end_block=25926364)

# test filter
ops = list(
events.on('producer_reward', {"producer": "emrebeyler"}))
self.assertEqual(1, len(ops))
self.assertEqual("emrebeyler", ops[0]["op"][1]["producer"])

# test condition
ops = list(events.on(
'comment', condition=lambda x: x["author"] == "jennybeans"))

self.assertEqual(1, len(ops))
self.assertEqual("jennybeans", ops[0]["op"][1]["author"])

# test multiple filtering
ops = list(events.on(
['withdraw_vesting_route', 'producer_reward']))
self.assertEqual(2, len(ops))

# test filter and condition together
ops = list(events.on(
['transfer_to_vesting'],
{'from': 'manimani'},
condition=lambda x: x["to"] not in ["kstop1", "nodaji"])
)

self.assertEqual(0, len(ops))

ops = list(events.on(
['transfer_to_vesting'],
{'from': 'manimani'},
condition=lambda x: x["to"] == "kstop1")
)

self.assertEqual(1, len(ops))


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 0ea085d

Please sign in to comment.