Skip to content
This repository was archived by the owner on Apr 22, 2025. It is now read-only.

Commit 9282be9

Browse files
committed
FAB-6200 Java serialize channels.
PS2 Check for shutdown channel. PS3 Minor cleanup no need for thread local. PS4 Remove unnecessary added code. PS5 Remove added commented code. PS6 Minor testcode change. PS7 Refactor cleaner to serialize on client itself. PS8 Refactor samplestore to use above. PS9 Typo fix. PS10 "" PS11 make sure stream closed. PS12 Make sure client is updated with channel. PS13 Move checks on both channels. PS14 sync on channels PS15 Do query peer channels on both again. Change-Id: I0e379d5dab626a4fd71cb66298612f5611215b78 Signed-off-by: rickr <cr22rc@gmail.com>
1 parent 24390b4 commit 9282be9

File tree

11 files changed

+326
-83
lines changed

11 files changed

+326
-83
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ The SDK acts on behave of a particular User which is defined by the embedding ap
99

1010
Note, the SDK does ***not*** provide a means of persistence
1111
for the application defined channels and user artifacts on the client. This is left for the embedding application to best manage.
12+
Channels may be serialized via Java serialization in the context of a client.
13+
Channels deserialized are not in an initialized state.
1214

1315
The SDK also provides a client for Hyperledger's certificate authority. The SDK is however not dependent on this
1416
particular implementation of a certificate authority. Other Certificate authority's maybe used by implementing the

src/main/java/org/hyperledger/fabric/sdk/Channel.java

Lines changed: 116 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@
1414

1515
package org.hyperledger.fabric.sdk;
1616

17+
import java.io.ByteArrayOutputStream;
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.io.ObjectInputStream;
21+
import java.io.ObjectOutputStream;
22+
import java.io.Serializable;
23+
import java.nio.file.Files;
24+
import java.nio.file.Paths;
25+
import java.nio.file.StandardOpenOption;
1726
import java.util.ArrayList;
1827
import java.util.Arrays;
1928
import java.util.Collection;
@@ -30,7 +39,6 @@
3039
import java.util.concurrent.BlockingQueue;
3140
import java.util.concurrent.CompletableFuture;
3241
import java.util.concurrent.ExecutionException;
33-
import java.util.concurrent.ExecutorService;
3442
import java.util.concurrent.Future;
3543
import java.util.concurrent.LinkedBlockingQueue;
3644
import java.util.concurrent.TimeUnit;
@@ -112,7 +120,8 @@
112120
* The class representing a channel with which the client SDK interacts.
113121
* <p>
114122
*/
115-
public class Channel {
123+
public class Channel implements Serializable {
124+
private static final long serialVersionUID = -3266164166893832538L;
116125
private static final Log logger = LogFactory.getLog(Channel.class);
117126
private static final boolean IS_DEBUG_LEVEL = logger.isDebugEnabled();
118127
private static final boolean IS_TRACE_LEVEL = logger.isTraceEnabled();
@@ -129,7 +138,7 @@ public class Channel {
129138
private final String name;
130139

131140
// The peers on this channel to which the client can connect
132-
private final Collection<Peer> peers = new Vector<>();
141+
final Collection<Peer> peers = new Vector<>();
133142

134143
// Temporary variables to control how long to wait for deploy and invoke to complete before
135144
// emitting events. This will be removed when the SDK is able to receive events from the
@@ -141,10 +150,26 @@ public class Channel {
141150

142151
// The crypto primitives object
143152
// private CryptoSuite cryptoSuite;
144-
private final Collection<Orderer> orderers = new LinkedList<>();
145-
HFClient client;
146-
private boolean initialized = false;
147-
private boolean shutdown = false;
153+
final Collection<Orderer> orderers = new LinkedList<>();
154+
transient HFClient client;
155+
private transient boolean initialized = false;
156+
private transient boolean shutdown = false;
157+
158+
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
159+
160+
in.defaultReadObject();
161+
initialized = false;
162+
shutdown = false;
163+
msps = new HashMap<>();
164+
txListeners = new LinkedHashMap<>();
165+
channelEventQue = new ChannelEventQue();
166+
blockListeners = new LinkedHashMap<>();
167+
168+
for (EventHub eventHub : getEventHubs()) {
169+
eventHub.setEventQue(channelEventQue);
170+
}
171+
172+
}
148173

149174
/**
150175
* Get all Event Hubs on this channel.
@@ -155,9 +180,8 @@ public Collection<EventHub> getEventHubs() {
155180
return Collections.unmodifiableCollection(eventHubs);
156181
}
157182

158-
private final Collection<EventHub> eventHubs = new LinkedList<>();
159-
private ExecutorService executorService;
160-
private Block genesisBlock;
183+
final Collection<EventHub> eventHubs = new LinkedList<>();
184+
private transient Block genesisBlock;
161185
private final boolean systemChannel;
162186

163187
private Channel(String name, HFClient hfClient, Orderer orderer, ChannelConfiguration channelConfiguration, byte[][] signers) throws InvalidArgumentException, TransactionException {
@@ -461,8 +485,6 @@ private Channel(String name, HFClient client, final boolean systemChannel) throw
461485
}
462486
this.name = name;
463487
this.client = client;
464-
this.executorService = client.getExecutorService();
465-
466488
logger.debug(format("Creating channel: %s, client context %s", isSystemChannel() ? "SYSTEM_CHANNEL" : name, client.getUserContext().getName()));
467489

468490
}
@@ -731,10 +753,10 @@ public Channel initialize() throws InvalidArgumentException, TransactionExceptio
731753
* @throws InvalidArgumentException
732754
* @throws CryptoException
733755
*/
734-
private void loadCACertificates() throws InvalidArgumentException, CryptoException {
756+
protected void loadCACertificates() throws InvalidArgumentException, CryptoException {
735757
logger.debug(format("Channel %s loadCACertificates", name));
736758

737-
if (msps == null) {
759+
if (msps == null || msps.isEmpty()) {
738760
throw new InvalidArgumentException("Unable to load CA certificates. Channel " + name + " does not have any MSPs.");
739761
}
740762

@@ -822,7 +844,7 @@ private Block getGenesisBlock(Orderer orderer) throws TransactionException {
822844
return genesisBlock;
823845
}
824846

825-
private Map<String, MSP> msps = new HashMap<>();
847+
private transient Map<String, MSP> msps = new HashMap<>();
826848

827849
boolean isSystemChannel() {
828850
return systemChannel;
@@ -2585,7 +2607,7 @@ public boolean unRegisterBlockListener(String handle) throws InvalidArgumentExce
25852607
* A queue each eventing hub will write events to.
25862608
*/
25872609

2588-
private final ChannelEventQue channelEventQue = new ChannelEventQue();
2610+
private transient ChannelEventQue channelEventQue = new ChannelEventQue();
25892611

25902612
class ChannelEventQue {
25912613

@@ -2659,15 +2681,15 @@ BlockEvent getNextEvent() throws EventHubException {
26592681
* Runs processing events from event hubs.
26602682
*/
26612683

2662-
Thread eventQueueThread = null;
2684+
transient Thread eventQueueThread = null;
26632685

26642686
private void startEventQue() {
26652687

26662688
if (eventQueueThread != null) {
26672689
return;
26682690
}
26692691

2670-
executorService.execute(() -> {
2692+
client.getExecutorService().execute(() -> {
26712693
eventQueueThread = Thread.currentThread();
26722694

26732695
while (!shutdown) {
@@ -2700,7 +2722,7 @@ private void startEventQue() {
27002722

27012723
for (BL l : blcopy) {
27022724
try {
2703-
executorService.execute(() -> l.listener.received(blockEvent));
2725+
client.getExecutorService().execute(() -> l.listener.received(blockEvent));
27042726
} catch (Throwable e) { //Don't let one register stop rest.
27052727
logger.error("Error trying to call block listener on channel " + blockEvent.getChannelId(), e);
27062728
}
@@ -2717,7 +2739,7 @@ private void startEventQue() {
27172739

27182740
private static final String BLOCK_LISTENER_TAG = "BLOCK_LISTENER_HANDLE";
27192741

2720-
private final LinkedHashMap<String, BL> blockListeners = new LinkedHashMap<>();
2742+
private transient LinkedHashMap<String, BL> blockListeners = new LinkedHashMap<>();
27212743

27222744
class BL {
27232745

@@ -2791,7 +2813,7 @@ private String registerTransactionListenerProcessor() throws InvalidArgumentExce
27912813
});
27922814
}
27932815

2794-
private final LinkedHashMap<String, LinkedList<TL>> txListeners = new LinkedHashMap<>();
2816+
private transient LinkedHashMap<String, LinkedList<TL>> txListeners = new LinkedHashMap<>();
27952817

27962818
private class TL {
27972819
final String txID;
@@ -2841,9 +2863,9 @@ void fire(BlockEvent.TransactionEvent transactionEvent) {
28412863
}
28422864

28432865
if (transactionEvent.isValid()) {
2844-
executorService.execute(() -> future.complete(transactionEvent));
2866+
client.getExecutorService().execute(() -> future.complete(transactionEvent));
28452867
} else {
2846-
executorService.execute(() -> future.completeExceptionally(
2868+
client.getExecutorService().execute(() -> future.completeExceptionally(
28472869
new TransactionEventException(format("Received invalid transaction event. Transaction ID %s status %s",
28482870
transactionEvent.getTransactionID(),
28492871
transactionEvent.getValidationCode()),
@@ -2905,7 +2927,7 @@ boolean isMatch(ChaincodeEvent chaincodeEvent) {
29052927

29062928
void fire(BlockEvent blockEvent, ChaincodeEvent ce) {
29072929

2908-
executorService.execute(() -> chaincodeEventListener.received(handle, blockEvent, ce));
2930+
client.getExecutorService().execute(() -> chaincodeEventListener.received(handle, blockEvent, ce));
29092931

29102932
}
29112933
}
@@ -2949,7 +2971,7 @@ public String registerChaincodeEventListener(Pattern chaincodeId, Pattern eventN
29492971

29502972
}
29512973

2952-
private String blh = null;
2974+
private transient String blh = null;
29532975

29542976
/**
29552977
* Unregister an existing chaincode event listener.
@@ -3063,16 +3085,22 @@ public synchronized void shutdown(boolean force) {
30633085

30643086
initialized = false;
30653087
shutdown = true;
3088+
if (chainCodeListeners != null) {
3089+
chainCodeListeners.clear();
30663090

3067-
executorService = null;
3091+
}
30683092

3069-
chainCodeListeners.clear();
3093+
if (blockListeners != null) {
3094+
blockListeners.clear();
3095+
}
30703096

3071-
blockListeners.clear();
3097+
if (client != null) {
3098+
client.removeChannel(this);
3099+
}
30723100

3073-
client.removeChannel(this);
3101+
client = null;
30743102

3075-
for (EventHub eh : getEventHubs()) {
3103+
for (EventHub eh : eventHubs) {
30763104

30773105
try {
30783106
eh.shutdown();
@@ -3098,12 +3126,66 @@ public synchronized void shutdown(boolean force) {
30983126

30993127
orderers.clear();
31003128

3101-
if (eventQueueThread != null) {
3102-
eventQueueThread.interrupt();
3129+
if (null != eventQueueThread) {
3130+
3131+
if (eventQueueThread != null) {
3132+
eventQueueThread.interrupt();
3133+
}
3134+
eventQueueThread = null;
3135+
}
3136+
}
3137+
3138+
/**
3139+
* Serialize channel to a file using Java serialization.
3140+
* Deserialized channel will NOT be in an initialized state.
3141+
*
3142+
* @param file file
3143+
* @throws IOException
3144+
* @throws InvalidArgumentException
3145+
*/
3146+
3147+
public void serializeChannel(File file) throws IOException, InvalidArgumentException {
3148+
3149+
if (null == file) {
3150+
throw new InvalidArgumentException("File parameter may not be null");
3151+
}
3152+
3153+
Files.write(Paths.get(file.getAbsolutePath()), serializeChannel(),
3154+
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
3155+
3156+
}
3157+
3158+
/**
3159+
* Serialize channel to a byte array using Java serialization.
3160+
* Deserialized channel will NOT be in an initialized state.
3161+
*
3162+
* @throws InvalidArgumentException
3163+
* @throws IOException
3164+
*/
3165+
public byte[] serializeChannel() throws IOException, InvalidArgumentException {
3166+
3167+
if (isShutdown()) {
3168+
throw new InvalidArgumentException(format("Channel %s has been shutdown.", getName()));
3169+
}
3170+
3171+
ObjectOutputStream out = null;
3172+
3173+
try {
3174+
ByteArrayOutputStream bai = new ByteArrayOutputStream();
3175+
out = new ObjectOutputStream(bai);
3176+
out.writeObject(this);
3177+
out.flush();
3178+
return bai.toByteArray();
3179+
} finally {
3180+
if (null != out) {
3181+
try {
3182+
out.close();
3183+
} catch (IOException e) {
3184+
logger.error(e); // best effort.
3185+
}
3186+
}
31033187
}
3104-
eventQueueThread = null;
31053188

3106-
client = null;
31073189
}
31083190

31093191
@Override

src/main/java/org/hyperledger/fabric/sdk/EventHub.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package org.hyperledger.fabric.sdk;
1616

17+
import java.io.Serializable;
1718
import java.util.ArrayList;
1819
import java.util.Properties;
1920
import java.util.concurrent.CountDownLatch;
@@ -46,27 +47,29 @@
4647
* Feeds Channel event queues with events
4748
*/
4849

49-
public class EventHub {
50+
public class EventHub implements Serializable {
51+
private static final long serialVersionUID = 2882609588201108148L;
5052
private static final Log logger = LogFactory.getLog(EventHub.class);
5153
private static final Config config = Config.getConfig();
5254
private static final long EVENTHUB_CONNECTION_WAIT_TIME = config.getEventHubConnectionWaitTime();
53-
private final ExecutorService executorService;
55+
56+
private final transient ExecutorService executorService;
5457

5558
private final String url;
5659
private final String name;
5760
private final Properties properties;
58-
private ManagedChannel managedChannel;
59-
private boolean connected = false;
60-
private EventsGrpc.EventsStub events;
61-
private StreamObserver<PeerEvents.SignedEvent> sender;
61+
private transient ManagedChannel managedChannel;
62+
private transient boolean connected = false;
63+
private transient EventsGrpc.EventsStub events;
64+
private transient StreamObserver<PeerEvents.SignedEvent> sender;
6265
/**
6366
* Event queue for all events from eventhubs in the channel
6467
*/
65-
private Channel.ChannelEventQue eventQue;
66-
private long connectedTime = 0L; // 0 := never connected
67-
private boolean shutdown = false;
68+
private transient Channel.ChannelEventQue eventQue;
69+
private transient long connectedTime = 0L; // 0 := never connected
70+
private transient boolean shutdown = false;
6871
private Channel channel;
69-
private TransactionContext transactionContext;
72+
private transient TransactionContext transactionContext;
7073

7174
/**
7275
* Get disconnected time.
@@ -170,7 +173,7 @@ boolean connect() throws EventHubException {
170173

171174
}
172175

173-
private StreamObserver<PeerEvents.Event> eventStream = null; // Saved here to avoid potential garbage collection
176+
private transient StreamObserver<PeerEvents.Event> eventStream = null; // Saved here to avoid potential garbage collection
174177

175178
synchronized boolean connect(final TransactionContext transactionContext) throws EventHubException {
176179
if (connected) {
@@ -290,8 +293,6 @@ public void onCompleted() {
290293
logger.error(e);
291294
}
292295

293-
294-
295296
if (!threw.isEmpty()) {
296297
eventStream = null;
297298
connected = false;
@@ -393,7 +394,7 @@ public interface EventHubDisconnected {
393394
* Default reconnect event hub implementation. Applications are free to replace
394395
*/
395396

396-
protected EventHubDisconnected disconnectedHandler = new EventHub.EventHubDisconnected() {
397+
protected transient EventHubDisconnected disconnectedHandler = new EventHub.EventHubDisconnected() {
397398
@Override
398399
public synchronized void disconnected(final EventHub eventHub) throws EventHubException {
399400
logger.info(format("Detected disconnect %s", eventHub.toString()));

0 commit comments

Comments
 (0)