Skip to content

Commit a62a013

Browse files
committed
GG-11655 - Fix merge
1 parent 92fff63 commit a62a013

File tree

1 file changed

+74
-62
lines changed

1 file changed

+74
-62
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java

Lines changed: 74 additions & 62 deletions
Original file line numberOriginal file lineDiff line numberDiff line change
@@ -851,7 +851,7 @@ public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> svcItf
851
}
851
}
852
}
852
}
853

853

854-
return new GridServiceProxy<>(prj, name, svcItf, sticky, ctx).proxy();
854+
return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy();
855
}
855
}
856

856

857
/**
857
/**
@@ -904,7 +904,7 @@ public <T> Collection<T> services(String name) {
904
* @param topVer Topology version.
904
* @param topVer Topology version.
905
* @throws IgniteCheckedException If failed.
905
* @throws IgniteCheckedException If failed.
906
*/
906
*/
907-
private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException {
907+
private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
908
ServiceConfiguration cfg = dep.configuration();
908
ServiceConfiguration cfg = dep.configuration();
909

909

910
Object nodeFilter = cfg.getNodeFilter();
910
Object nodeFilter = cfg.getNodeFilter();
@@ -918,7 +918,7 @@ private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheck
918
Object affKey = cfg.getAffinityKey();
918
Object affKey = cfg.getAffinityKey();
919

919

920
while (true) {
920
while (true) {
921-
GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer);
921+
GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion());
922

922

923
Collection<ClusterNode> nodes;
923
Collection<ClusterNode> nodes;
924

924

@@ -948,7 +948,7 @@ private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheck
948
Map<UUID, Integer> cnts = new HashMap<>();
948
Map<UUID, Integer> cnts = new HashMap<>();
949

949

950
if (affKey != null) {
950
if (affKey != null) {
951-
ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer));
951+
ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
952

952

953
if (n != null) {
953
if (n != null) {
954
int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
954
int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
@@ -1180,7 +1180,7 @@ private Service copyAndInject(ServiceConfiguration cfg) throws IgniteCheckedExce
1180
if (cfg instanceof LazyServiceConfiguration) {
1180
if (cfg instanceof LazyServiceConfiguration) {
1181
byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
1181
byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
1182

1182

1183-
Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
1183+
Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()));
1184

1184

1185
ctx.resource().inject(srvc);
1185
ctx.resource().inject(srvc);
1186

1186

@@ -1190,10 +1190,9 @@ private Service copyAndInject(ServiceConfiguration cfg) throws IgniteCheckedExce
1190
Service svc = cfg.getService();
1190
Service svc = cfg.getService();
1191

1191

1192
try {
1192
try {
1193-
byte[] bytes = m.marshal(svc);
1193+
byte[] bytes = U.marshal(m, svc);
1194

1194

1195-
Service cp = m.unmarshal(bytes,
1195+
Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
1196-
U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
1197

1196

1198
ctx.resource().inject(cp);
1197
ctx.resource().inject(cp);
1199

1198

@@ -1268,8 +1267,8 @@ private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<O
1268
ClusterNode oldestSrvNode =
1267
ClusterNode oldestSrvNode =
1269
CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
1268
CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
1270

1269

1271-
if (oldestSrvNode == null)
1270+
if (oldestSrvNode == null)
1272-
return F.emptyIterator();
1271+
return new GridEmptyIterator<>();
1273

1272

1274
GridCacheQueryManager qryMgr = cache.context().queries();
1273
GridCacheQueryManager qryMgr = cache.context().queries();
1275

1274

@@ -1455,7 +1454,7 @@ private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridSer
1455
svcName.set(dep.configuration().getName());
1454
svcName.set(dep.configuration().getName());
1456

1455

1457
// Ignore other utility cache events.
1456
// Ignore other utility cache events.
1458-
long topVer = ctx.discovery().topologyVersion();
1457+
AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
1459

1458

1460
ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
1459
ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
1461

1460

@@ -1506,60 +1505,60 @@ private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridSer
1506
}
1505
}
1507
}
1506
}
1508

1507

1509-
/**
1508+
/**
1510-
* Deployment callback.
1509+
* Deployment callback.
1511-
*
1510+
*
1512-
* @param dep Service deployment.
1511+
* @param dep Service deployment.
1513-
* @param topVer Topology version.
1512+
* @param topVer Topology version.
1514-
*/
1513+
*/
1515-
private void onDeployment(final GridServiceDeployment dep, final long topVer) {
1514+
private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
1516-
// Retry forever.
1515+
// Retry forever.
1517-
try {
1516+
try {
1518-
long newTopVer = ctx.discovery().topologyVersion();
1517+
AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
1519-
1520-
// If topology version changed, reassignment will happen from topology event.
1521-
if (newTopVer == topVer)
1522-
reassign(dep, topVer);
1523-
}
1524-
catch (IgniteCheckedException e) {
1525-
if (!(e instanceof ClusterTopologyCheckedException))
1526-
log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
1527-
1528-
long newTopVer = ctx.discovery().topologyVersion();
1529-
1530-
if (newTopVer != topVer) {
1531-
assert newTopVer > topVer;
1532

1518

1533-
// Reassignment will happen from topology event.
1519+
// If topology version changed, reassignment will happen from topology event.
1534-
return;
1520+
if (newTopVer.equals(topVer))
1521+
reassign(dep, topVer);
1535
}
1522
}
1523+
catch (IgniteCheckedException e) {
1524+
if (!(e instanceof ClusterTopologyCheckedException))
1525+
log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
1536

1526

1537-
ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
1527+
AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
1538-
private IgniteUuid id = IgniteUuid.randomUuid();
1539

1528

1540-
private long start = System.currentTimeMillis();
1529+
if (!newTopVer.equals(topVer)) {
1530+
assert newTopVer.compareTo(topVer) > 0;
1541

1531

1542-
@Override public IgniteUuid timeoutId() {
1532+
// Reassignment will happen from topology event.
1543-
return id;
1533+
return;
1544
}
1534
}
1545

1535

1546-
@Override public long endTime() {
1536+
ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
1547-
return start + RETRY_TIMEOUT;
1537+
private IgniteUuid id = IgniteUuid.randomUuid();
1548-
}
1549

1538

1550-
@Override public void onTimeout() {
1539+
private long start = System.currentTimeMillis();
1551-
if (!busyLock.enterBusy())
1552-
return;
1553

1540

1554-
try {
1541+
@Override public IgniteUuid timeoutId() {
1555-
// Try again.
1542+
return id;
1556-
onDeployment(dep, topVer);
1557
}
1543
}
1558-
finally {
1544+
1559-
busyLock.leaveBusy();
1545+
@Override public long endTime() {
1546+
return start + RETRY_TIMEOUT;
1560
}
1547
}
1561-
}
1548+
1562-
});
1549+
@Override public void onTimeout() {
1550+
if (!busyLock.enterBusy())
1551+
return;
1552+
1553+
try {
1554+
// Try again.
1555+
onDeployment(dep, topVer);
1556+
}
1557+
finally {
1558+
busyLock.leaveBusy();
1559+
}
1560+
}
1561+
});
1563
}
1562
}
1564
}
1563
}
1565

1564

@@ -1568,16 +1567,28 @@ private void onDeployment(final GridServiceDeployment dep, final long topVer) {
1568
*/
1567
*/
1569
private class TopologyListener implements GridLocalEventListener {
1568
private class TopologyListener implements GridLocalEventListener {
1570
/** {@inheritDoc} */
1569
/** {@inheritDoc} */
1571-
@Override public void onEvent(final Event evt) {
1570+
@Override public void onEvent(Event evt) {
1572
if (!busyLock.enterBusy())
1571
if (!busyLock.enterBusy())
1573
return;
1572
return;
1574

1573

1575
try {
1574
try {
1575+
final AffinityTopologyVersion topVer;
1576+
1577+
if (evt instanceof DiscoveryCustomEvent) {
1578+
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
1579+
1580+
topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
1581+
1582+
if (msg instanceof CacheAffinityChangeMessage) {
1583+
if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
1584+
return;
1585+
}
1586+
}
1587+
else
1588+
topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
1589+
1576
depExe.submit(new BusyRunnable() {
1590
depExe.submit(new BusyRunnable() {
1577
@Override public void run0() {
1591
@Override public void run0() {
1578-
AffinityTopologyVersion topVer =
1579-
new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion());
1580-
1581
ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
1592
ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
1582

1593

1583
if (oldest != null && oldest.isLocal()) {
1594
if (oldest != null && oldest.isLocal()) {
@@ -1612,7 +1623,7 @@ private class TopologyListener implements GridLocalEventListener {
1612
ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
1623
ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
1613
affinityReadyFuture(topVer).get();
1624
affinityReadyFuture(topVer).get();
1614

1625

1615-
reassign(dep, topVer.topologyVersion());
1626+
reassign(dep, topVer);
1616
}
1627
}
1617
catch (IgniteCheckedException ex) {
1628
catch (IgniteCheckedException ex) {
1618
if (!(e instanceof ClusterTopologyCheckedException))
1629
if (!(e instanceof ClusterTopologyCheckedException))
@@ -1629,7 +1640,7 @@ private class TopologyListener implements GridLocalEventListener {
1629
}
1640
}
1630

1641

1631
if (!retries.isEmpty())
1642
if (!retries.isEmpty())
1632-
onReassignmentFailed(topVer.topologyVersion(), retries);
1643+
onReassignmentFailed(topVer, retries);
1633
}
1644
}
1634

1645

1635
// Clean up zombie assignments.
1646
// Clean up zombie assignments.
@@ -1666,13 +1677,14 @@ private class TopologyListener implements GridLocalEventListener {
1666
* @param topVer Topology version.
1677
* @param topVer Topology version.
1667
* @param retries Retries.
1678
* @param retries Retries.
1668
*/
1679
*/
1669-
private void onReassignmentFailed(final long topVer, final Collection<GridServiceDeployment> retries) {
1680+
private void onReassignmentFailed(final AffinityTopologyVersion topVer,
1681+
final Collection<GridServiceDeployment> retries) {
1670
if (!busyLock.enterBusy())
1682
if (!busyLock.enterBusy())
1671
return;
1683
return;
1672

1684

1673
try {
1685
try {
1674
// If topology changed again, let next event handle it.
1686
// If topology changed again, let next event handle it.
1675-
if (ctx.discovery().topologyVersion() != topVer)
1687+
if (ctx.discovery().topologyVersionEx().equals(topVer))
1676
return;
1688
return;
1677

1689

1678
for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) {
1690
for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) {

0 commit comments

Comments
 (0)