Skip to content
This repository was archived by the owner on Apr 22, 2025. It is now read-only.

Commit e71d54f

Browse files
committed
FABJ-356 Peer eventing services too many reconnects
Change-Id: I45590320d6e8aa60e64c09b7c115283b749d0f81 Signed-off-by: rickr <cr22rc@gmail.com>
1 parent 6738eba commit e71d54f

File tree

3 files changed

+124
-24
lines changed

3 files changed

+124
-24
lines changed

src/main/java/org/hyperledger/fabric/sdk/Peer.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Objects;
2222
import java.util.Properties;
2323
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.atomic.AtomicLong;
2425

2526
import com.google.common.util.concurrent.ListenableFuture;
2627
import io.netty.util.internal.StringUtil;
@@ -31,6 +32,7 @@
3132
import org.hyperledger.fabric.protos.peer.FabricProposalResponse;
3233
import org.hyperledger.fabric.sdk.Channel.PeerOptions;
3334
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;
35+
import org.hyperledger.fabric.sdk.exception.PeerEventingServiceException;
3436
import org.hyperledger.fabric.sdk.exception.PeerException;
3537
import org.hyperledger.fabric.sdk.exception.TransactionException;
3638
import org.hyperledger.fabric.sdk.helper.Config;
@@ -60,7 +62,7 @@ public class Peer implements Serializable {
6062
private String channelName;
6163
private transient TransactionContext transactionContext;
6264
private transient long lastConnectTime;
63-
private transient long reconnectCount;
65+
private transient AtomicLong reconnectCount;
6466
private transient BlockEvent lastBlockEvent;
6567
private transient long lastBlockNumber;
6668
private transient byte[] clientTLSCertificateDigest;
@@ -75,6 +77,7 @@ boolean hasConnected() {
7577
}
7678

7779
Peer(String name, String grpcURL, Properties properties) throws InvalidArgumentException {
80+
reconnectCount = new AtomicLong(0L);
7881

7982
Exception e = checkGrpcUrl(grpcURL);
8083
if (e != null) {
@@ -89,7 +92,6 @@ boolean hasConnected() {
8992
this.url = grpcURL;
9093
this.name = name;
9194
this.properties = properties == null ? null : (Properties) properties.clone(); //keep our own copy.
92-
reconnectCount = 0L;
9395

9496
}
9597

@@ -361,7 +363,7 @@ public long getLastConnectTime() {
361363

362364
@Override
363365
public long getReconnectCount() {
364-
return reconnectCount;
366+
return reconnectCount.longValue();
365367
}
366368

367369
@Override
@@ -372,7 +374,7 @@ public Throwable getExceptionThrown() {
372374
@Override
373375
public void reconnect(Long startBLockNumber) throws TransactionException {
374376
logger.trace(format("Channel %s %s reconnecting. Starting block number: %s", channelName, toString(), startBLockNumber == null ? "newest" : startBLockNumber));
375-
++reconnectCount;
377+
reconnectCount.getAndIncrement();
376378

377379
if (startBLockNumber == null) {
378380
peerOptions.startEventsNewest();
@@ -398,11 +400,11 @@ void setLastConnectTime(long lastConnectTime) {
398400

399401
void resetReconnectCount() {
400402
connected = true;
401-
reconnectCount = 0L;
403+
reconnectCount = new AtomicLong(0L);
402404
}
403405

404406
long getReconnectCount() {
405-
return reconnectCount;
407+
return reconnectCount.longValue();
406408
}
407409

408410
synchronized void setTLSCertificateKeyPair(TLSCertificateKeyPair tlsCertificateKeyPair) {
@@ -482,6 +484,16 @@ private static PeerEventingServiceDisconnected getDefaultDisconnectHandler() {
482484
public synchronized void disconnected(final PeerEventingServiceDisconnectEvent event) {
483485

484486
BlockEvent lastBlockEvent = event.getLatestBLockReceived();
487+
Throwable thrown = event.getExceptionThrown();
488+
489+
long sleepTime = PEER_EVENT_RETRY_WAIT_TIME;
490+
491+
if (thrown instanceof PeerEventingServiceException) {
492+
// means we connected and got an error or connected but timout waiting on the response
493+
// not going away.. sleep longer.
494+
sleepTime = Math.min(5000L, PEER_EVENT_RETRY_WAIT_TIME + event.getReconnectCount() * 100L); //wait longer if we connected.
495+
//don't flood server.
496+
}
485497

486498
Long startBlockNumber = null;
487499

@@ -490,12 +502,10 @@ public synchronized void disconnected(final PeerEventingServiceDisconnectEvent e
490502
startBlockNumber = lastBlockEvent.getBlockNumber();
491503
}
492504

493-
if (0 != event.getReconnectCount()) {
494-
try {
495-
Thread.sleep(PEER_EVENT_RETRY_WAIT_TIME);
496-
} catch (InterruptedException e) {
497-
498-
}
505+
try {
506+
Thread.sleep(sleepTime);
507+
} catch (InterruptedException e) {
508+
e.printStackTrace();
499509
}
500510

501511
try {
@@ -606,6 +616,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
606616
in.defaultReadObject();
607617
disconnectedHandler = getDefaultDisconnectHandler();
608618
connected = false;
619+
reconnectCount = new AtomicLong(0L);
609620

610621
}
611622
} // end Peer

src/main/java/org/hyperledger/fabric/sdk/PeerEventServiceClient.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Properties;
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425

2526
import io.grpc.ManagedChannel;
2627
import io.grpc.ManagedChannelBuilder;
@@ -35,6 +36,7 @@
3536
import org.hyperledger.fabric.protos.peer.PeerEvents.DeliverResponse;
3637
import org.hyperledger.fabric.sdk.Channel.PeerOptions;
3738
import org.hyperledger.fabric.sdk.exception.CryptoException;
39+
import org.hyperledger.fabric.sdk.exception.PeerEventingServiceException;
3840
import org.hyperledger.fabric.sdk.exception.TransactionException;
3941
import org.hyperledger.fabric.sdk.helper.Config;
4042
import org.hyperledger.fabric.sdk.transaction.TransactionContext;
@@ -146,7 +148,7 @@ synchronized void shutdown(boolean force) {
146148
}
147149
}
148150
}
149-
peer = null;
151+
150152
channelEventQue = null;
151153

152154
}
@@ -169,6 +171,8 @@ void connectEnvelope(Envelope envelope) throws TransactionException {
169171
return;
170172
}
171173

174+
final AtomicBoolean retry = new AtomicBoolean(true); // make sure we only retry connection once for each connection attempt.
175+
172176
ManagedChannel lmanagedChannel = managedChannel;
173177

174178
if (lmanagedChannel == null || lmanagedChannel.isTerminated() || lmanagedChannel.isShutdown()) {
@@ -208,8 +212,16 @@ public void onNext(DeliverResponse resp) {
208212
peer.resetReconnectCount();
209213
} else {
210214

211-
throwableList.add(new TransactionException(format("Channel %s peer %s Status returned failure code %d (%s) during peer service event registration",
212-
channelName, peer.getName(), resp.getStatusValue(), resp.getStatus().name())));
215+
final long rec = peer.getReconnectCount();
216+
217+
PeerEventingServiceException peerEventingServiceException = new PeerEventingServiceException(format("Channel %s peer %s attempts %s Status returned failure code %d (%s) during peer service event registration",
218+
channelName, peer.getName(), rec, resp.getStatusValue(), resp.getStatus().name()));
219+
peerEventingServiceException.setResponse(resp);
220+
if (rec % 10 == 0) {
221+
logger.warn(peerEventingServiceException.getMessage());
222+
}
223+
224+
throwableList.add(peerEventingServiceException);
213225
}
214226

215227
} else if (typeCase == FILTERED_BLOCK || typeCase == BLOCK) {
@@ -224,10 +236,8 @@ public void onNext(DeliverResponse resp) {
224236
peer.setLastConnectTime(System.currentTimeMillis());
225237
long reconnectCount = peer.getReconnectCount();
226238
if (reconnectCount > 1) {
227-
228239
logger.info(format("Peer eventing service reconnected after %d attempts on channel %s, peer %s, url %s",
229240
reconnectCount, channelName, name, url));
230-
231241
}
232242
peer.resetReconnectCount();
233243

@@ -239,8 +249,11 @@ public void onNext(DeliverResponse resp) {
239249
logger.error(format("Channel %s peer %s got event block with unknown type: %s, %d",
240250
channelName, peer.getName(), typeCase.name(), typeCase.getNumber()));
241251

242-
throwableList.add(new TransactionException(format("Channel %s peer %s Status got unknown type %s, %d",
243-
channelName, peer.getName(), typeCase.name(), typeCase.getNumber())));
252+
PeerEventingServiceException peerEventingServiceException = new PeerEventingServiceException(format("Channel %s peer %s got event block with unknown type: %s, %d",
253+
channelName, peer.getName(), typeCase.name(), typeCase.getNumber()));
254+
peerEventingServiceException.setResponse(resp);
255+
256+
throwableList.add(peerEventingServiceException);
244257

245258
}
246259
finishLatch.countDown();
@@ -251,7 +264,12 @@ public void onNext(DeliverResponse resp) {
251264
public void onError(Throwable t) {
252265
ManagedChannel llmanagedChannel = managedChannel;
253266
if (llmanagedChannel != null) {
254-
llmanagedChannel.shutdownNow();
267+
try {
268+
llmanagedChannel.shutdownNow();
269+
} catch (Exception e) {
270+
logger.warn(format("Received error on peer eventing service on channel %s, peer %s, url %s, attempts %d. %s shut down of grpc channel.",
271+
channelName, name, url, peer == null ? -1 : peer.getReconnectCount(), e.getMessage()), e);
272+
}
255273
managedChannel = null;
256274
}
257275
if (!shutdown) {
@@ -266,7 +284,9 @@ public void onError(Throwable t) {
266284

267285
}
268286

269-
peer.reconnectPeerEventServiceClient(PeerEventServiceClient.this, t);
287+
if (retry.getAndSet(false)) {
288+
peer.reconnectPeerEventServiceClient(PeerEventServiceClient.this, t);
289+
}
270290

271291
}
272292
finishLatch.countDown();
@@ -288,8 +308,9 @@ public void onCompleted() {
288308

289309
// try {
290310
if (!finishLatch.await(peerEventRegistrationWaitTimeMilliSecs, TimeUnit.MILLISECONDS)) {
291-
TransactionException ex = new TransactionException(format(
311+
PeerEventingServiceException ex = new PeerEventingServiceException(format(
292312
"Channel %s connect time exceeded for peer eventing service %s, timed out at %d ms.", channelName, name, peerEventRegistrationWaitTimeMilliSecs));
313+
ex.setTimedOut(peerEventRegistrationWaitTimeMilliSecs);
293314
throwableList.add(0, ex);
294315

295316
}
@@ -302,7 +323,9 @@ public void onCompleted() {
302323
managedChannel = null;
303324
}
304325
Throwable throwable = throwableList.get(0);
305-
peer.reconnectPeerEventServiceClient(this, throwable);
326+
if (retry.getAndSet(false)) {
327+
peer.reconnectPeerEventServiceClient(this, throwable);
328+
}
306329

307330
}
308331

@@ -314,7 +337,9 @@ public void onCompleted() {
314337
}
315338
logger.error(e); // not likely
316339

317-
peer.reconnectPeerEventServiceClient(this, e);
340+
if (retry.getAndSet(false)) {
341+
peer.reconnectPeerEventServiceClient(this, e);
342+
}
318343

319344
} finally {
320345
if (null != nso) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
*
3+
* Copyright 2016,2017 DTCC, Fujitsu Australia Software Technology, IBM - All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package org.hyperledger.fabric.sdk.exception;
18+
19+
import org.hyperledger.fabric.protos.peer.PeerEvents;
20+
21+
public class PeerEventingServiceException extends TransactionException {
22+
23+
private static final long serialVersionUID = 1L;
24+
25+
/**
26+
* @return if timed out the time that was waited; otherwise, -1
27+
*/
28+
public long getTimedOut() {
29+
return timedOut;
30+
}
31+
32+
private long timedOut = -1L;
33+
34+
/**
35+
* If response from the Peer's error is received return it.
36+
*
37+
* @return Response error from peer if received otherwise null.
38+
*/
39+
public PeerEvents.DeliverResponse getResp() {
40+
return resp;
41+
}
42+
43+
private PeerEvents.DeliverResponse resp;
44+
45+
public PeerEventingServiceException(String message, Throwable parent) {
46+
super(message, parent);
47+
}
48+
49+
public PeerEventingServiceException(String message) {
50+
super(message);
51+
}
52+
53+
public PeerEventingServiceException(Throwable t) {
54+
super(t);
55+
}
56+
57+
public void setResponse(PeerEvents.DeliverResponse resp) {
58+
this.resp = resp;
59+
}
60+
61+
public void setTimedOut(long peerEventRegistrationWaitTimeMilliSecs) {
62+
this.timedOut = peerEventRegistrationWaitTimeMilliSecs;
63+
}
64+
}

0 commit comments

Comments
 (0)