forked from Netflix/eureka
-
Notifications
You must be signed in to change notification settings - Fork 178
/
PeerAwareInstanceRegistryImpl.java
697 lines (650 loc) · 28.7 KB
/
PeerAwareInstanceRegistryImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
/*
* Copyright 2012 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.eureka.registry;
import com.netflix.appinfo.*;
import com.netflix.appinfo.AmazonInfo.MetaDataKey;
import com.netflix.appinfo.DataCenterInfo.Name;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.Version;
import com.netflix.eureka.cluster.PeerEurekaNode;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.lease.Lease;
import com.netflix.eureka.registry.rule.*;
import com.netflix.eureka.resources.ASGResource.ASGStatus;
import com.netflix.eureka.resources.CurrentRequestVersion;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.util.MeasuredRate;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.net.URI;
import java.util.*;
/**
* Handles replication of all operations to {@link AbstractInstanceRegistry} to peer
* <em>Eureka</em> nodes to keep them all in sync.
*
* <p>
* Primary operations that are replicated are the
* <em>Registers,Renewals,Cancels,Expirations and Status Changes</em>
* </p>
*
* <p>
* When the eureka server starts up it tries to fetch all the registry
* information from the peer eureka nodes.If for some reason this operation
* fails, the server does not allow the user to get the registry information for
* a period specified in
* {@link com.netflix.eureka.EurekaServerConfig#getWaitTimeInMsWhenSyncEmpty()}.
* </p>
*
* <p>
* One important thing to note about <em>renewals</em>.If the renewal drops more
* than the specified threshold as specified in
* {@link com.netflix.eureka.EurekaServerConfig#getRenewalPercentThreshold()} within a period of
* {@link com.netflix.eureka.EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()}, eureka
* perceives this as a danger and stops expiring instances.
* </p>
*
* @author Karthik Ranganathan, Greg Kim
*
*/
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
private static final Logger logger = LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class);
private static final String US_EAST_1 = "us-east-1";
private static final int PRIME_PEER_NODES_RETRY_MS = 30000;
private long startupTime = 0;
private boolean peerInstancesTransferEmptyOnStartup = true;
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
public com.netflix.servo.monitor.Timer getTimer() {
return this.timer;
}
}
private static final Comparator<Application> APP_COMPARATOR = new Comparator<Application>() {
public int compare(Application l, Application r) {
return l.getName().compareTo(r.getName());
}
};
private final MeasuredRate numberOfReplicationsLastMin;
protected final EurekaClient eurekaClient;
protected volatile PeerEurekaNodes peerEurekaNodes;
/**
* 应用实例状态覆盖规则
*/
private final InstanceStatusOverrideRule instanceStatusOverrideRule;
private Timer timer = new Timer(
"ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);
@Inject
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
@Override
protected InstanceStatusOverrideRule getInstanceInfoOverrideRule() {
return this.instanceStatusOverrideRule;
}
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
initializedResponseCache();
scheduleRenewalThresholdUpdateTask();
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
/**
* Perform all cleanup and shutdown operations.
*/
@Override
public void shutdown() {
try {
DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this));
} catch (Throwable t) {
logger.error("Cannot shutdown monitor registry", t);
}
try {
peerEurekaNodes.shutdown();
} catch (Throwable t) {
logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", t);
}
numberOfReplicationsLastMin.stop();
super.shutdown();
}
/**
* Schedule the task that updates <em>renewal threshold</em> periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
*
*/
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
/**
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got " + count + " instances from neighboring DS node");
logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
/**
* Prime connections for Aws replicas.
* <p>
* Sometimes when the eureka servers comes up, AWS firewall may not allow
* the network connections immediately. This will cause the outbound
* connections to fail, but the inbound connections continue to work. What
* this means is the clients would have switched to this node (after EIP
* binding) and so the other eureka nodes will expire all instances that
* have been switched because of the lack of outgoing heartbeats from this
* instance.
* </p>
* <p>
* The best protection in this scenario is to block and wait until we are
* able to ping all eureka nodes successfully atleast once. Until then we
* won't open up the traffic.
* </p>
*/
private void primeAwsReplicas(ApplicationInfoManager applicationInfoManager) {
boolean areAllPeerNodesPrimed = false;
while (!areAllPeerNodesPrimed) {
String peerHostName = null;
try {
Application eurekaApps = this.getApplication(applicationInfoManager.getInfo().getAppName(), false);
if (eurekaApps == null) {
areAllPeerNodesPrimed = true;
logger.info("No peers needed to prime.");
return;
}
for (PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
for (InstanceInfo peerInstanceInfo : eurekaApps.getInstances()) {
LeaseInfo leaseInfo = peerInstanceInfo.getLeaseInfo();
// If the lease is expired - do not worry about priming
if (System.currentTimeMillis() > (leaseInfo
.getRenewalTimestamp() + (leaseInfo
.getDurationInSecs() * 1000))
+ (2 * 60 * 1000)) {
continue;
}
peerHostName = peerInstanceInfo.getHostName();
logger.info("Trying to send heartbeat for the eureka server at {} to make sure the " +
"network channels are open", peerHostName);
// Only try to contact the eureka nodes that are in this instance's registry - because
// the other instances may be legitimately down
if (peerHostName.equalsIgnoreCase(new URI(node.getServiceUrl()).getHost())) {
node.heartbeat(
peerInstanceInfo.getAppName(),
peerInstanceInfo.getId(),
peerInstanceInfo,
null,
true);
}
}
}
areAllPeerNodesPrimed = true;
} catch (Throwable e) {
logger.error("Could not contact " + peerHostName, e);
try {
Thread.sleep(PRIME_PEER_NODES_RETRY_MS);
} catch (InterruptedException e1) {
logger.warn("Interrupted while priming : ", e1);
areAllPeerNodesPrimed = true;
}
}
}
}
/**
* Checks to see if the registry access is allowed or the server is in a
* situation where it does not all getting registry information. The server
* does not return registry information for a period specified in
* {@link EurekaServerConfig#getWaitTimeInMsWhenSyncEmpty()}, if it cannot
* get the registry information from the peer eureka nodes at start up.
*
* @return false - if the instances count from a replica transfer returned
* zero and if the wait time has not elapsed, otherwise returns true
*/
@Override
public boolean shouldAllowAccess(boolean remoteRegionRequired) {
if (this.peerInstancesTransferEmptyOnStartup) {
if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
return false;
}
}
if (remoteRegionRequired) {
for (RemoteRegionRegistry remoteRegionRegistry : this.regionNameVSRemoteRegistry.values()) {
if (!remoteRegionRegistry.isReadyForServingData()) {
return false;
}
}
}
return true;
}
public boolean shouldAllowAccess() {
return shouldAllowAccess(true);
}
/**
* @deprecated use {@link com.netflix.eureka.cluster.PeerEurekaNodes#getPeerEurekaNodes()} directly.
*
* Gets the list of peer eureka nodes which is the list to replicate
* information to.
*
* @return the list of replica nodes.
*/
@Deprecated
public List<PeerEurekaNode> getReplicaNodes() {
return Collections.unmodifiableList(peerEurekaNodes.getPeerEurekaNodes());
}
/*
* (non-Javadoc)
*
* @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String,
* java.lang.String, long, boolean)
*/
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) { // 下线
// Eureka-Server 复制
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
// 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 租约过期时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注册应用实例信息
super.register(info, leaseDuration, isReplication);
// Eureka-Server 复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
/*
* (non-Javadoc)
*
* @see com.netflix.eureka.registry.InstanceRegistry#renew(java.lang.String,
* java.lang.String, long, boolean)
*/
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) { // 续租
// Eureka-Server 复制
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
/*
* (non-Javadoc)
*
* @see com.netflix.eureka.registry.InstanceRegistry#statusUpdate(java.lang.String,
* java.lang.String, com.netflix.appinfo.InstanceInfo.InstanceStatus,
* java.lang.String, boolean)
*/
@Override
public boolean statusUpdate(final String appName, final String id,
final InstanceStatus newStatus, String lastDirtyTimestamp,
final boolean isReplication) {
if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
// Eureka-Server 集群同步
replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
return true;
}
return false;
}
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
// Eureka-Server 集群同步
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
return true;
}
return false;
}
/**
* Replicate the <em>ASG status</em> updates to peer eureka nodes. If this
* event is a replication from other nodes, then it is not replicated to
* other nodes.
*
* @param asgName the asg name for which the status needs to be replicated.
* @param newStatus the {@link ASGStatus} information that needs to be replicated.
* @param isReplication true if this is a replication event from other nodes, false otherwise.
*/
@Override
public void statusUpdate(final String asgName, final ASGStatus newStatus, final boolean isReplication) {
// If this is replicated from an other node, do not try to replicate again.
if (isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
replicateASGInfoToReplicaNodes(asgName, newStatus, node);
}
}
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
/**
* Checks to see if the self-preservation mode is enabled.
*
* <p>
* The self-preservation mode is enabled if the expected number of renewals
* per minute {@link #getNumOfRenewsInLastMin()} is lesser than the expected
* threshold which is determined by {@link #getNumOfRenewsPerMinThreshold()}
* . Eureka perceives this as a danger and stops expiring instances as this
* is most likely because of a network event. The mode is disabled only when
* the renewals get back to above the threshold or if the flag
* {@link EurekaServerConfig#shouldEnableSelfPreservation()} is set to
* false.
* </p>
*
* @return true if the self-preservation mode is enabled, false otherwise.
*/
@Override
public boolean isSelfPreservationModeEnabled() {
return serverConfig.shouldEnableSelfPreservation();
}
@Override
public InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure) {
// TODO Auto-generated method stub
return null;
}
/**
* Updates the <em>renewal threshold</em> based on the current number of
* renewals. The threshold is a percentage as specified in
* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
* received per minute {@link #getNumOfRenewsInLastMin()}.
*/
private void updateRenewalThreshold() {
try {
// 计算 应用实例数
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
// 计算 expectedNumberOfRenewsPerMin 、 numberOfRenewsPerMinThreshold 参数
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
/**
* Gets the list of all {@link Applications} from the registry in sorted
* lexical order of {@link Application#getName()}.
*
* @return the list of {@link Applications} in lexical order.
*/
@Override
public List<Application> getSortedApplications() {
List<Application> apps = new ArrayList<Application>(getApplications().getRegisteredApplications());
Collections.sort(apps, APP_COMPARATOR);
return apps;
}
/**
* Gets the number of <em>renewals</em> in the last minute.
*
* @return a long value representing the number of <em>renewals</em> in the last minute.
*/
@com.netflix.servo.annotations.Monitor(name = "numOfReplicationsInLastMin",
description = "Number of total replications received in the last minute",
type = com.netflix.servo.annotations.DataSourceType.GAUGE)
public long getNumOfReplicationsInLastMin() {
return numberOfReplicationsLastMin.getCount();
}
/**
* Checks if the number of renewals is lesser than threshold.
*
* @return 0 if the renewals are greater than threshold, 1 otherwise.
*/
@com.netflix.servo.annotations.Monitor(name = "isBelowRenewThreshold", description = "0 = false, 1 = true",
type = com.netflix.servo.annotations.DataSourceType.GAUGE)
@Override
public int isBelowRenewThresold() {
if ((getNumOfRenewsInLastMin() <= numberOfRenewsPerMinThreshold)
&&
((this.startupTime > 0) && (System.currentTimeMillis() > this.startupTime + (serverConfig.getWaitTimeInMsWhenSyncEmpty())))) {
return 1;
} else {
return 0;
}
}
/**
* Checks if an instance is registerable in this region. Instances from other regions are rejected.
*
* @param instanceInfo th instance info information of the instance
* @return true, if it can be registered in this server, false otherwise.
*/
public boolean isRegisterable(InstanceInfo instanceInfo) {
DataCenterInfo datacenterInfo = instanceInfo.getDataCenterInfo();
String serverRegion = clientConfig.getRegion();
if (AmazonInfo.class.isInstance(datacenterInfo)) {
AmazonInfo info = AmazonInfo.class.cast(instanceInfo.getDataCenterInfo());
String availabilityZone = info.get(MetaDataKey.availabilityZone);
// Can be null for dev environments in non-AWS data center
if (availabilityZone == null && US_EAST_1.equalsIgnoreCase(serverRegion)) {
return true;
} else if ((availabilityZone != null) && (availabilityZone.contains(serverRegion))) {
// If in the same region as server, then consider it registerable
return true;
}
}
return true; // Everything non-amazon is registrable.
}
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
/**
* Replicates all ASG status changes to peer eureka nodes except for
* replication traffic to this node.
*/
private void replicateASGInfoToReplicaNodes(final String asgName,
final ASGStatus newStatus, final PeerEurekaNode node) {
CurrentRequestVersion.set(Version.V2);
try {
node.statusUpdate(asgName, newStatus);
} catch (Throwable e) {
logger.error("Cannot replicate ASG status information to {}", node.getServiceUrl(), e);
}
}
@Override
@com.netflix.servo.annotations.Monitor(name = "localRegistrySize",
description = "Current registry size", type = DataSourceType.GAUGE)
public long getLocalRegistrySize() {
return super.getLocalRegistrySize();
}
}