Skip to content

Commit

Permalink
Add new cc lifecycle methods
Browse files Browse the repository at this point in the history
Include cc package, install, approveformyorg, commit, queryinstalled, queryapproved, querycommitted

Close #156

Signed-off-by: kalichyn <ext-nazarii.kalichynskyi@here.com>
  • Loading branch information
Nazarii Kalichynsky committed Aug 21, 2021
1 parent 4c6056c commit 8ee33a8
Show file tree
Hide file tree
Showing 161 changed files with 3,942 additions and 434 deletions.
121 changes: 121 additions & 0 deletions hfc/fabric/base_chaincode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import asyncio
import uuid


class BaseChaincode:
def __init__(self, cc_name):
self.evts = {}
self._name = cc_name

def create_onCcEvent(self, _uuid, tx_id):
class CCEvent(object):
def __init__(self, _uuid, evts, evt_tx_id):
self.uuid = _uuid
self.evts = evts # keep reference, no copy
self.evt_tx_id = evt_tx_id

def cc_event(self, cc_event, block_number, tx_id, tx_status):
if tx_id in self.evts:
if 'txEvents' not in self.evts[tx_id]:
self.evts[tx_id]['txEvents'] = []
self.evts[tx_id]['txEvents'] += [{
'cc_event': cc_event,
'tx_status': tx_status,
'block_number': block_number,
}]

# unregister chaincode event if same tx_id
# and disconnect as chaincode evt are unregister False
if tx_id == self.evt_tx_id:
for x in self.evts[tx_id]['peer']:
if x['uuid'] == self.uuid:
x['channel_event_hub']. \
unregisterChaincodeEvent(x['cr'])
x['channel_event_hub'].disconnect()

o = CCEvent(_uuid, self.evts, tx_id)
return o.cc_event

def txEvent(self, tx_id, tx_status, block_number):
if tx_id in self.evts:
if 'txEvents' not in self.evts[tx_id]:
self.evts[tx_id]['txEvents'] = []
self.evts[tx_id]['txEvents'] += [{
'tx_status': tx_status,
'block_number': block_number,
}]

async def wait_for_event(self, tx_context, target_peers, channel, requestor, cc_pattern, wait_for_event_timeout):
event_stream = []

for target_peer in target_peers:
channel_event_hub = channel.newChannelEventHub(target_peer,
requestor)
stream = channel_event_hub.connect()
event_stream.append(stream)
# use chaincode event
if cc_pattern is not None:

# needed in callback for ref in callback
_uuid = uuid.uuid4().hex

cr = channel_event_hub.registerChaincodeEvent(
self._name,
cc_pattern,
onEvent=self.create_onCcEvent(_uuid, tx_context.tx_id))

if tx_context.tx_id not in self.evts:
self.evts[tx_context.tx_id] = {'peer': []}

self.evts[tx_context.tx_id]['peer'] += [
{
'uuid': _uuid,
'channel_event_hub': channel_event_hub,
'cr': cr
}
]
# use transaction event
else:
txid = channel_event_hub.registerTxEvent(
tx_context.tx_id,
unregister=True,
disconnect=True,
onEvent=self.txEvent)

if txid not in self.evts:
self.evts[txid] = {'channel_event_hubs': []}

self.evts[txid]['channel_event_hubs'] += [channel_event_hub]

try:
await asyncio.wait_for(asyncio.gather(*event_stream,
return_exceptions=True),
timeout=wait_for_event_timeout)
except asyncio.TimeoutError:
for k, v in self.evts.items():
if cc_pattern is not None:
for x in v['peer']:
x['channel_event_hub']. \
unregisterChaincodeEvent(x['cr'])
else:
for x in v['channel_event_hubs']:
x.unregisterTxEvent(k)
raise TimeoutError('waitForEvent timed out.')
except Exception as e:
raise e
else:
# check if all tx are valids
txEvents = self.evts[tx_context.tx_id]['txEvents']
statuses = [x['tx_status'] for x in txEvents]
if not all([x == 'VALID' for x in statuses]):
raise Exception(statuses)
finally:
# disconnect channel_event_hubs
if cc_pattern is not None:
for x in self.evts[tx_context.tx_id]['peer']:
x['channel_event_hub'].disconnect()
else:
cehs = self.evts[tx_context.tx_id]['channel_event_hubs']
for x in cehs:
x.disconnect()
del self.evts[tx_context.tx_id]
127 changes: 8 additions & 119 deletions hfc/fabric/chaincode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import asyncio
import itertools
import logging
import uuid
from time import sleep

from grpc._channel import _MultiThreadedRendezvous

from hfc.protos.peer.chaincode_pb2 import ChaincodeData, CDSData
from hfc.fabric.block_decoder import decode_proposal_response_payload, decode_signature_policy_envelope
from hfc.fabric.transaction.tx_context import create_tx_context
Expand All @@ -15,6 +15,7 @@
from hfc.util.consts import CC_INSTALL, CC_INSTANTIATE, CC_UPGRADE, CC_INVOKE, CC_QUERY, CC_TYPE_GOLANG, \
DEFAULT_WAIT_FOR_EVENT_TIMEOUT, GRPC_BROKER_UNAVAILABLE_RETRY_DELAY, \
SUCCESS_STATUS
from hfc.fabric.base_chaincode import BaseChaincode

_logger = logging.getLogger(__name__)

Expand All @@ -30,11 +31,10 @@ def __init__(self, fcn, operation_type, send_proposal):
self.send_proposal = send_proposal


class Chaincode:
class Chaincode(BaseChaincode):
def __init__(self, client, cc_name):
super().__init__(cc_name)
self._client = client
self._name = cc_name
self.evts = {}

self.operation_mapping = {
CC_INSTANTIATE: lambda fcn: ChaincodeOperation(fcn, CC_INSTANTIATE, self.send_instantiate_proposal),
Expand Down Expand Up @@ -90,6 +90,7 @@ async def install(self, requestor, peers, cc_path,
responses, proposal, header = utils.send_install_proposal(tx_context,
target_peers)
res = await asyncio.gather(*responses)
# install returns package ids
return res

async def instantiate(self, requestor, channel_name, peers,
Expand Down Expand Up @@ -330,7 +331,7 @@ async def invoke(self, requestor, channel_name, peers, args,
grpc_broker_unavailable_retry=0,
grpc_broker_unavailable_retry_delay=GRPC_BROKER_UNAVAILABLE_RETRY_DELAY, # ms
raise_broker_unavailable=True,
raise_on_error=False):
raise_on_error=False, is_init=False):
"""
Invoke chaincode for ledger update
Expand Down Expand Up @@ -366,7 +367,8 @@ async def invoke(self, requestor, channel_name, peers, args,
cc_type=cc_type,
fcn=fcn,
args=args,
transient_map=transient_map
transient_map=transient_map,
is_init=is_init
)

tx_context = create_tx_context(
Expand Down Expand Up @@ -466,116 +468,3 @@ async def invoke(self, requestor, channel_name, peers, args,

res = decode_proposal_response_payload(res[0].payload)
return res['extension']['response']['payload'].decode('utf-8')

def create_onCcEvent(self, _uuid, tx_id):
class CCEvent(object):
def __init__(self, _uuid, evts, evt_tx_id):
self.uuid = _uuid
self.evts = evts # keep reference, no copy
self.evt_tx_id = evt_tx_id

def cc_event(self, cc_event, block_number, tx_id, tx_status):
if tx_id in self.evts:
if 'txEvents' not in self.evts[tx_id]:
self.evts[tx_id]['txEvents'] = []
self.evts[tx_id]['txEvents'] += [{
'cc_event': cc_event,
'tx_status': tx_status,
'block_number': block_number,
}]

# unregister chaincode event if same tx_id
# and disconnect as chaincode evt are unregister False
if tx_id == self.evt_tx_id:
for x in self.evts[tx_id]['peer']:
if x['uuid'] == self.uuid:
x['channel_event_hub']. \
unregisterChaincodeEvent(x['cr'])
x['channel_event_hub'].disconnect()

o = CCEvent(_uuid, self.evts, tx_id)
return o.cc_event

def txEvent(self, tx_id, tx_status, block_number):
if tx_id in self.evts:
if 'txEvents' not in self.evts[tx_id]:
self.evts[tx_id]['txEvents'] = []
self.evts[tx_id]['txEvents'] += [{
'tx_status': tx_status,
'block_number': block_number,
}]

async def wait_for_event(self, tx_context, target_peers, channel, requestor, cc_pattern, wait_for_event_timeout):
event_stream = []

for target_peer in target_peers:
channel_event_hub = channel.newChannelEventHub(target_peer,
requestor)
stream = channel_event_hub.connect()
event_stream.append(stream)
# use chaincode event
if cc_pattern is not None:

# needed in callback for ref in callback
_uuid = uuid.uuid4().hex

cr = channel_event_hub.registerChaincodeEvent(
self._name,
cc_pattern,
onEvent=self.create_onCcEvent(_uuid, tx_context.tx_id))

if tx_context.tx_id not in self.evts:
self.evts[tx_context.tx_id] = {'peer': []}

self.evts[tx_context.tx_id]['peer'] += [
{
'uuid': _uuid,
'channel_event_hub': channel_event_hub,
'cr': cr
}
]
# use transaction event
else:
txid = channel_event_hub.registerTxEvent(
tx_context.tx_id,
unregister=True,
disconnect=True,
onEvent=self.txEvent)

if txid not in self.evts:
self.evts[txid] = {'channel_event_hubs': []}

self.evts[txid]['channel_event_hubs'] += [channel_event_hub]

try:
await asyncio.wait_for(asyncio.gather(*event_stream,
return_exceptions=True),
timeout=wait_for_event_timeout)
except asyncio.TimeoutError:
for k, v in self.evts.items():
if cc_pattern is not None:
for x in v['peer']:
x['channel_event_hub']. \
unregisterChaincodeEvent(x['cr'])
else:
for x in v['channel_event_hubs']:
x.unregisterTxEvent(k)
raise TimeoutError('waitForEvent timed out.')
except Exception as e:
raise e
else:
# check if all tx are valids
txEvents = self.evts[tx_context.tx_id]['txEvents']
statuses = [x['tx_status'] for x in txEvents]
if not all([x == 'VALID' for x in statuses]):
raise Exception(statuses)
finally:
# disconnect channel_event_hubs
if cc_pattern is not None:
for x in self.evts[tx_context.tx_id]['peer']:
x['channel_event_hub'].disconnect()
else:
cehs = self.evts[tx_context.tx_id]['channel_event_hubs']
for x in cehs:
x.disconnect()
del self.evts[tx_context.tx_id]

0 comments on commit 8ee33a8

Please sign in to comment.