Skip to content

Commit d9c463e

Browse files
author
Jingxiao Gu
committed
[FAB-7091] Extract StreamObserver out of ChaincodeBase
1. extract anonymous inner class StreamObserver into org.hyperledger.fabric.shim.impl.ChatStream 2. remove ChaincodeBase's dependency on Handler 3. change Handler's dependency on ChaincodeBase into Chaincode 4. move Handler.serialSend() into ChatStream 5. move ChaincodeBase.toJsonString into ChatStream 6. add ChatStream.receive() for ChaincodeBase.chatWithPeer() Change-Id: I6089b6ce241bc9fcc35b604d6b6685bbf3820b3e Signed-off-by: Jingxiao Gu <gjxgu@cn.ibm.com>
1 parent e3644f4 commit d9c463e

File tree

3 files changed

+129
-92
lines changed

3 files changed

+129
-92
lines changed

shim/src/main/java/org/hyperledger/fabric/shim/ChaincodeBase.java

Lines changed: 6 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@
66

77
package org.hyperledger.fabric.shim;
88

9-
import com.google.protobuf.InvalidProtocolBufferException;
10-
import com.google.protobuf.util.JsonFormat;
119
import io.grpc.ManagedChannel;
1210
import io.grpc.netty.GrpcSslContexts;
1311
import io.grpc.netty.NegotiationType;
1412
import io.grpc.netty.NettyChannelBuilder;
15-
import io.grpc.stub.StreamObserver;
1613
import io.netty.handler.ssl.SslContext;
1714
import org.apache.commons.cli.CommandLine;
1815
import org.apache.commons.cli.DefaultParser;
@@ -22,10 +19,7 @@
2219
import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeID;
2320
import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage;
2421
import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type;
25-
import org.hyperledger.fabric.protos.peer.ChaincodeSupportGrpc;
26-
import org.hyperledger.fabric.protos.peer.ChaincodeSupportGrpc.ChaincodeSupportStub;
27-
import org.hyperledger.fabric.shim.impl.Handler;
28-
import org.hyperledger.fabric.shim.impl.NextStateInfo;
22+
import org.hyperledger.fabric.shim.impl.ChatStream;
2923

3024
import javax.net.ssl.SSLException;
3125
import java.io.File;
@@ -55,7 +49,6 @@ public abstract class ChaincodeBase implements Chaincode {
5549
private boolean tlsEnabled = false;
5650
private String rootCertFile = "/etc/hyperledger/fabric/peer.crt";
5751

58-
private Handler handler;
5952
private String id;
6053

6154
private final static String CORE_CHAINCODE_ID_NAME = "CORE_CHAINCODE_ID_NAME";
@@ -67,8 +60,7 @@ public abstract class ChaincodeBase implements Chaincode {
6760
/**
6861
* Start chaincode
6962
*
70-
* @param args
71-
* command line arguments
63+
* @param args command line arguments
7264
*/
7365
public void start(String[] args) {
7466
processEnvironmentOptions();
@@ -158,47 +150,7 @@ public ManagedChannel newPeerClientConnection() {
158150
}
159151

160152
public void chatWithPeer(ManagedChannel connection) {
161-
// Establish stream with validating peer
162-
ChaincodeSupportStub stub = ChaincodeSupportGrpc.newStub(connection);
163-
164-
logger.info("Connecting to peer.");
165-
166-
StreamObserver<ChaincodeMessage> requestObserver = null;
167-
try {
168-
requestObserver = stub.register(new StreamObserver<ChaincodeMessage>() {
169-
170-
@Override
171-
public void onNext(ChaincodeMessage message) {
172-
logger.debug("Got message from peer: " + toJsonString(message));
173-
try {
174-
logger.debug(String.format("[%-8s]Received message %s from org.hyperledger.fabric.shim", message.getTxid(), message.getType()));
175-
handler.handleMessage(message);
176-
} catch (Exception e) {
177-
e.printStackTrace();
178-
System.exit(-1);
179-
}
180-
}
181-
182-
@Override
183-
public void onError(Throwable e) {
184-
logger.error("Unable to connect to peer server: " + e.getMessage());
185-
System.exit(-1);
186-
}
187-
188-
@Override
189-
public void onCompleted() {
190-
connection.shutdown();
191-
handler.nextState.close();
192-
}
193-
});
194-
} catch (Exception e) {
195-
logger.error("Unable to connect to peer server");
196-
System.exit(-1);
197-
}
198-
199-
// Create the org.hyperledger.fabric.shim handler responsible for all
200-
// control logic
201-
handler = new Handler(requestObserver, this);
153+
ChatStream chatStream = new ChatStream(connection, this);
202154

203155
// Send the ChaincodeID during register.
204156
ChaincodeID chaincodeID = ChaincodeID.newBuilder()
@@ -212,23 +164,13 @@ public void onCompleted() {
212164

213165
// Register on the stream
214166
logger.info(String.format("Registering as '%s' ... sending %s", id, Type.REGISTER));
215-
handler.serialSend(payload);
167+
chatStream.serialSend(payload);
216168

217169
while (true) {
218170
try {
219-
NextStateInfo nsInfo = handler.nextState.take();
220-
ChaincodeMessage message = nsInfo.message;
221-
handler.handleMessage(message);
222-
// keepalive messages are PONGs to the fabric's PINGs
223-
if (nsInfo.sendToCC || message.getType() == Type.KEEPALIVE) {
224-
if (message.getType() == Type.KEEPALIVE) {
225-
logger.info("Sending KEEPALIVE response");
226-
} else {
227-
logger.info(String.format("[%-8s]Send state message %s", message.getTxid(), message.getType()));
228-
}
229-
handler.serialSend(message);
230-
}
171+
chatStream.receive();
231172
} catch (Exception e) {
173+
logger.error("Receiving message error", e);
232174
break;
233175
}
234176
}
@@ -276,12 +218,4 @@ private static byte[] printStackTrace(Throwable throwable) {
276218
throwable.printStackTrace(new PrintWriter(buffer));
277219
return buffer.toString().getBytes(StandardCharsets.UTF_8);
278220
}
279-
280-
static String toJsonString(ChaincodeMessage message) {
281-
try {
282-
return JsonFormat.printer().print(message);
283-
} catch (InvalidProtocolBufferException e) {
284-
return String.format("{ Type: %s, TxId: %s }", message.getType(), message.getTxid());
285-
}
286-
}
287221
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright IBM Corp., DTCC All Rights Reserved.
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package org.hyperledger.fabric.shim.impl;
8+
9+
import com.google.protobuf.InvalidProtocolBufferException;
10+
import com.google.protobuf.util.JsonFormat;
11+
import io.grpc.ManagedChannel;
12+
import io.grpc.stub.StreamObserver;
13+
import org.apache.commons.logging.Log;
14+
import org.apache.commons.logging.LogFactory;
15+
import org.hyperledger.fabric.protos.peer.ChaincodeShim;
16+
import org.hyperledger.fabric.protos.peer.ChaincodeSupportGrpc;
17+
import org.hyperledger.fabric.shim.Chaincode;
18+
19+
import static java.lang.String.format;
20+
21+
public class ChatStream implements StreamObserver<ChaincodeShim.ChaincodeMessage> {
22+
23+
private static final Log logger = LogFactory.getLog(ChatStream.class);
24+
25+
private final ManagedChannel connection;
26+
private final Handler handler;
27+
private StreamObserver<ChaincodeShim.ChaincodeMessage> streamObserver;
28+
29+
public ChatStream(ManagedChannel connection, Chaincode chaincode) {
30+
// Establish stream with validating peer
31+
ChaincodeSupportGrpc.ChaincodeSupportStub stub = ChaincodeSupportGrpc.newStub(connection);
32+
33+
logger.info("Connecting to peer.");
34+
35+
try {
36+
this.streamObserver = stub.register(this);
37+
} catch (Exception e) {
38+
logger.error("Unable to connect to peer server", e);
39+
System.exit(-1);
40+
}
41+
this.connection = connection;
42+
43+
// Create the org.hyperledger.fabric.shim handler responsible for all
44+
// control logic
45+
this.handler = new Handler(this, chaincode);
46+
}
47+
48+
public synchronized void serialSend(ChaincodeShim.ChaincodeMessage message) {
49+
if(logger.isDebugEnabled()) {
50+
logger.debug(format("[%-8s]Sending %s message to peer.", message.getTxid(), message.getType()));
51+
}
52+
if (logger.isTraceEnabled()) {
53+
logger.trace(format("[%-8s]ChaincodeMessage: %s", message.getTxid(), toJsonString(message)));
54+
}
55+
try {
56+
this.streamObserver.onNext(message);
57+
if (logger.isTraceEnabled()) {
58+
logger.trace(format("[%-8s]%s message sent.", message.getTxid(), message.getType()));
59+
}
60+
} catch (Exception e) {
61+
logger.error(String.format("[%-8s]Error sending %s: %s", message.getTxid(), message.getType(), e));
62+
throw new RuntimeException(format("Error sending %s: %s", message.getType(), e));
63+
}
64+
}
65+
66+
@Override
67+
public void onNext(ChaincodeShim.ChaincodeMessage message) {
68+
if(logger.isDebugEnabled()) {
69+
logger.debug("Got message from peer: " + toJsonString(message));
70+
}
71+
try {
72+
if(logger.isDebugEnabled()) {
73+
logger.debug(String.format("[%-8s]Received message %s from org.hyperledger.fabric.shim", message.getTxid(), message.getType()));
74+
}
75+
handler.handleMessage(message);
76+
} catch (Exception e) {
77+
logger.error(String.format("[%-8s]Error handling message %s: %s", message.getTxid(), message.getType(), e));
78+
System.exit(-1);
79+
}
80+
}
81+
82+
@Override
83+
public void onError(Throwable e) {
84+
logger.error("Unable to connect to peer server: " + e.getMessage());
85+
System.exit(-1);
86+
}
87+
88+
@Override
89+
public void onCompleted() {
90+
connection.shutdown();
91+
handler.nextState.close();
92+
}
93+
94+
static String toJsonString(ChaincodeShim.ChaincodeMessage message) {
95+
try {
96+
return JsonFormat.printer().print(message);
97+
} catch (InvalidProtocolBufferException e) {
98+
return String.format("{ Type: %s, TxId: %s }", message.getType(), message.getTxid());
99+
}
100+
}
101+
102+
public void receive() throws Exception {
103+
NextStateInfo nsInfo = handler.nextState.take();
104+
ChaincodeShim.ChaincodeMessage message = nsInfo.message;
105+
onNext(message);
106+
107+
// keepalive messages are PONGs to the fabric's PINGs
108+
if (nsInfo.sendToCC || message.getType() == ChaincodeShim.ChaincodeMessage.Type.KEEPALIVE) {
109+
if (message.getType() == ChaincodeShim.ChaincodeMessage.Type.KEEPALIVE) {
110+
logger.info("Sending KEEPALIVE response");
111+
} else {
112+
logger.info(String.format("[%-8s]Send state message %s", message.getTxid(), message.getType()));
113+
}
114+
serialSend(message);
115+
}
116+
}
117+
}

shim/src/main/java/org/hyperledger/fabric/shim/impl/Handler.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.google.protobuf.ByteString;
1010
import com.google.protobuf.InvalidProtocolBufferException;
1111
import com.google.protobuf.util.JsonFormat;
12-
import io.grpc.stub.StreamObserver;
1312
import org.apache.commons.logging.Log;
1413
import org.apache.commons.logging.LogFactory;
1514
import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeID;
@@ -21,7 +20,6 @@
2120
import org.hyperledger.fabric.protos.peer.ProposalResponsePackage.Response;
2221
import org.hyperledger.fabric.protos.peer.ProposalResponsePackage.Response.Builder;
2322
import org.hyperledger.fabric.shim.Chaincode;
24-
import org.hyperledger.fabric.shim.ChaincodeBase;
2523
import org.hyperledger.fabric.shim.ChaincodeStub;
2624
import org.hyperledger.fabric.shim.fsm.CBDesc;
2725
import org.hyperledger.fabric.shim.fsm.Event;
@@ -47,16 +45,16 @@ public class Handler {
4745

4846
private static Log logger = LogFactory.getLog(Handler.class);
4947

50-
private StreamObserver<ChaincodeMessage> chatStream;
51-
private ChaincodeBase chaincode;
48+
private ChatStream chatStream;
49+
private Chaincode chaincode;
5250

5351
private Map<String, Boolean> isTransaction;
5452
private Map<String, Channel<ChaincodeMessage>> responseChannel;
55-
public Channel<NextStateInfo> nextState;
53+
Channel<NextStateInfo> nextState;
5654

5755
private FSM fsm;
5856

59-
public Handler(StreamObserver<ChaincodeMessage> chatStream, ChaincodeBase chaincode) {
57+
public Handler(ChatStream chatStream, Chaincode chaincode) {
6058
this.chatStream = chatStream;
6159
this.chaincode = chaincode;
6260

@@ -99,18 +97,6 @@ private void triggerNextState(ChaincodeMessage message, boolean send) {
9997
nextState.add(new NextStateInfo(message, send));
10098
}
10199

102-
public synchronized void serialSend(ChaincodeMessage message) {
103-
logger.debug(format("[%-8s]Sending %s message to peer.", message.getTxid(), message.getType()));
104-
if (logger.isTraceEnabled()) logger.trace(format("[%-8s]ChaincodeMessage: %s", toJsonString(message)));
105-
try {
106-
chatStream.onNext(message);
107-
if (logger.isTraceEnabled()) logger.trace(format("[%-8s]%s message sent.", message.getTxid(), message.getType()));
108-
} catch (Exception e) {
109-
logger.error(String.format("[%-8s]Error sending %s: %s", message.getTxid(), message.getType(), e));
110-
throw new RuntimeException(format("Error sending %s: %s", message.getType(), e));
111-
}
112-
}
113-
114100
private synchronized Channel<ChaincodeMessage> aquireResponseChannelForTx(final String channelId, final String txId) {
115101
final Channel<ChaincodeMessage> channel = new Channel<>();
116102
String key = getTxKey(channelId, txId);
@@ -398,7 +384,7 @@ private ByteString invokeChaincodeSupport(final ChaincodeMessage message) {
398384
Channel<ChaincodeMessage> responseChannel = aquireResponseChannelForTx(channelId, txId);
399385

400386
// send the message
401-
serialSend(message);
387+
chatStream.serialSend(message);
402388

403389
// wait for response
404390
final ChaincodeMessage response = receiveChannel(responseChannel);
@@ -468,7 +454,7 @@ public synchronized void handleMessage(ChaincodeMessage message) throws Exceptio
468454

469455
if (fsm.eventCannotOccur(message.getType().toString())) {
470456
String errStr = String.format("[%s]Chaincode handler org.hyperledger.fabric.shim.fsm cannot handle message (%s) with payload size (%d) while in state: %s", message.getTxid(), message.getType(), message.getPayload().size(), fsm.current());
471-
serialSend(newErrorEventMessage(message.getChannelId(), message.getTxid(), errStr));
457+
chatStream.serialSend(newErrorEventMessage(message.getChannelId(), message.getTxid(), errStr));
472458
throw new RuntimeException(errStr);
473459
}
474460

0 commit comments

Comments
 (0)