Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
fc6cdf8
ignite-5357: neineighborhoods selection logic was added
daradurvs Feb 27, 2018
7fe3b60
ignite-5357: fix statement with 'canRemap' flag
daradurvs Feb 27, 2018
9c07ad7
ignite-5357: code style fix
daradurvs Feb 27, 2018
65546ae
ignite-5357: replaced parallelStream with stream
daradurvs Feb 27, 2018
f6197a0
ignite-5357: fix nullable logic
daradurvs Feb 27, 2018
b35ddb9
ignite-5357: remove unnecessary imports
daradurvs Feb 27, 2018
518a083
ignite-5357: skip primary node condition was added
daradurvs Feb 27, 2018
ded2441
ignite-5357: refactoring
daradurvs Feb 28, 2018
883dbbf
ignite-5357: javadoc fix
daradurvs Feb 28, 2018
e76013f
ignite-5357: refactoring
daradurvs Feb 28, 2018
eaaa031
ignite-5357: back logic
daradurvs Feb 28, 2018
66097c4
ignite-5357: changed logic
daradurvs Feb 28, 2018
f6f0db6
ignite-5357: changed logic 2
daradurvs Feb 28, 2018
91c75ef
ignite-5357: removed utils method
daradurvs Feb 28, 2018
26c639e
Merge remote-tracking branch 'origin/ignite-5357-2' into ignite-5357
daradurvs Feb 28, 2018
09ba7b2
ignite-5357: requests distribution tests were added
daradurvs Mar 1, 2018
8a685e1
ignite-5357: fix tests classes names
daradurvs Mar 2, 2018
56579c8
ignite-5357: fix node for getting primary keys
daradurvs Mar 2, 2018
a5fca8a
ignite-5357: code style fix: long line split
daradurvs Mar 2, 2018
d85fd75
ignite-5357: javadoc comment was extended
daradurvs Mar 2, 2018
2de55a8
ignite-5357: generator tests were added
daradurvs Mar 2, 2018
c0c95e0
ignite-5357: lamda expressions were unwraped
daradurvs Mar 2, 2018
d12e14b
ignite-5357: minor fix
daradurvs Mar 2, 2018
62093d2
ignite-5357: review notes fixes; refactoring
daradurvs Mar 11, 2018
43f485e
ignite-5357: moved random generator to method
daradurvs Mar 12, 2018
c0fc039
ignite-5357: review notes fixes
daradurvs Mar 12, 2018
5a98fb8
ignite-5357: review notes fixes: remove static map
daradurvs Mar 12, 2018
19435e5
ignite-5357: removed deprecated assertion
daradurvs Mar 12, 2018
ae1007a
ignite-5357: review fixes
daradurvs Mar 12, 2018
8bf3224
ignite-5357: renamed method and variable
daradurvs Mar 12, 2018
0b68af0
ignite-5357: partitioned cache tests were added
daradurvs Mar 12, 2018
04ddc6e
ignite-5357: minor fixes
daradurvs Mar 12, 2018
cb1c59f
ignite-5357: returned transactionIsolation in transactional tests
daradurvs Mar 12, 2018
2c3b7f8
ignite-5357: polish
daradurvs Mar 12, 2018
6dcefa4
ignite-5910: review fixes
daradurvs Mar 19, 2018
99475a7
ignite-5910: old behavior switcher was added
daradurvs Mar 19, 2018
6445ffb
ignite-5910: wip
daradurvs Mar 19, 2018
5ea6614
ignite-5910: wip2
daradurvs Mar 19, 2018
86cebff
cleanup
daradurvs Mar 19, 2018
4ff71d0
cleanup
daradurvs Mar 19, 2018
6df095f
ignite-5357: locMacs moved to classes field
daradurvs Mar 19, 2018
eee4afc
ignite-5357: locMacs nonNull assertion was added
daradurvs Mar 20, 2018
2257a45
ignite-5357: readFromBackup variable was added
daradurvs Mar 20, 2018
cfde9a5
Merge branch 'master' into ignite-5357
daradurvs Mar 20, 2018
a67e6b4
ignite-5357: cleanup
daradurvs Mar 20, 2018
5d93e97
ignite-5357: refactoring in tests
daradurvs Mar 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.net.ssl.HostnameVerifier;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
Expand Down Expand Up @@ -830,6 +831,16 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP = "IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP";

/**
* Whenever read load balancing is enabled, that means 'get' requests will be distributed between primary and backup
* nodes if it is possible and {@link CacheConfiguration#readFromBackup} is {@code true}.
*
* Default is {@code true}.
*
* @see CacheConfiguration#readFromBackup
*/
public static final String IGNITE_READ_LOAD_BALANCING = "IGNITE_READ_LOAD_BALANCING";

/**
* Enforces singleton.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheInterceptor;
Expand Down Expand Up @@ -106,12 +108,14 @@
import org.apache.ignite.plugin.security.SecurityPermission;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_READ_LOAD_BALANCING;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;

/**
Expand Down Expand Up @@ -258,6 +262,15 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Statistics enabled flag. */
private volatile boolean statisticsEnabled;

/** Whether to enable read load balancing. */
private final boolean readLoadBalancingEnabled = IgniteSystemProperties.getBoolean(IGNITE_READ_LOAD_BALANCING, true);

/** Flag indicating whether data can be read from backup. */
private boolean readFromBackup = CacheConfiguration.DFLT_READ_FROM_BACKUP;

/** Local node's MAC address. */
private String locMacs;

/**
* Empty constructor required for {@link Externalizable}.
*/
Expand Down Expand Up @@ -377,6 +390,12 @@ public GridCacheContext(
expiryPlc = null;

itHolder = new CacheWeakQueryIteratorsHolder(log);

readFromBackup = cacheCfg.isReadFromBackup();

locMacs = localNode().attribute(ATTR_MACS);

assert locMacs != null;
}

/**
Expand Down Expand Up @@ -2157,6 +2176,51 @@ else if (type == EVT_CACHE_REBALANCE_STOPPED) {
return true;
}

/**
* Determines an affinity node to send get request to.
*
* @param affNodes All affinity nodes.
* @param canRemap Flag indicating that 'get' should be done on a locked topology version.
* @return Affinity node to get key from or {@code null} if there is no suitable alive node.
*/
@Nullable public ClusterNode selectAffinityNodeBalanced(List<ClusterNode> affNodes, boolean canRemap) {
if (!readLoadBalancingEnabled) {
if (!canRemap) {
for (ClusterNode node : affNodes) {
if (ctx.discovery().alive(node))
return node;
}

return null;
}
else
return affNodes.get(0);
}

if (!readFromBackup)
return affNodes.get(0);

assert locMacs != null;

int r = ThreadLocalRandom.current().nextInt(affNodes.size());

ClusterNode n0 = null;

for (ClusterNode node : affNodes) {
if (canRemap || discovery().alive(node)) {
if (locMacs.equals(node.attribute(ATTR_MACS)))
return node;

if (r >= 0 || n0 == null)
n0 = node;
}

r--;
}

return n0;
}

/**
* Prepare affinity field for builder (if possible).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,6 @@ protected CacheDistributedGetFutureAdapter(
futId = IgniteUuid.randomUuid();
}

/**
* Affinity node to send get request to.
*
* @param affNodes All affinity nodes.
* @return Affinity node to get key from.
*/
protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
if (!canRemap) {
for (ClusterNode node : affNodes) {
if (cctx.discovery().alive(node))
return node;
}

return null;
}
else
return affNodes.get(0);
}

/**
* @param part Partition.
* @return {@code True} if partition is in owned state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private boolean map(
}
}

ClusterNode node = affinityNode(affNodes);
ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, canRemap);

if (node == null) {
onDone(serverNotFoundError(topVer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void map(final AffinityTopologyVersion topVer) {
}
}

ClusterNode affNode = affinityNode(affNodes);
ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap);

if (affNode == null) {
onDone(serverNotFoundError(topVer));
Expand Down Expand Up @@ -711,25 +711,6 @@ private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopol
"(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
}

/**
* Affinity node to send get request to.
*
* @param affNodes All affinity nodes.
* @return Affinity node to get key from.
*/
@Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
if (!canRemap) {
for (ClusterNode node : affNodes) {
if (cctx.discovery().alive(node))
return node;
}

return null;
}
else
return affNodes.get(0);
}

/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ private Map<KeyCacheObject, GridNearCacheEntry> map(
}
}

ClusterNode affNode = affinityNode(affNodes);
ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap);

if (affNode == null) {
onDone(serverNotFoundError(topVer));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;

import static org.apache.ignite.cache.CacheMode.PARTITIONED;

/**
* Tests of partitioned atomic cache's 'get' requests distribution.
*/
public class PartitionedAtomicCacheGetsDistributionTest extends ReplicatedAtomicCacheGetsDistributionTest {
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return PARTITIONED;
}

/** {@inheritDoc} */
@Override protected <K, V> CacheConfiguration<K, V> cacheConfiguration() {
CacheConfiguration<K, V> cacheCfg = super.cacheConfiguration();

cacheCfg.setBackups(backupsCount());

return cacheCfg;
}

/**
* @return Backups count.
*/
protected int backupsCount() {
return gridCount() - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

/**
* Tests of optimistic transactional partitioned cache's 'get' requests distribution.
*/
public class PartitionedTransactionalOptimisticCacheGetsDistributionTest extends PartitionedAtomicCacheGetsDistributionTest {
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return TRANSACTIONAL;
}

/** {@inheritDoc} */
@Override protected TransactionIsolation transactionIsolation() {
return READ_COMMITTED;
}

/** {@inheritDoc} */
@Override protected TransactionConcurrency transactionConcurrency() {
return OPTIMISTIC;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.transactions.TransactionConcurrency;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;

/**
* Tests of pessimistic transactional partitioned cache's 'get' requests distribution.
*/
public class PartitionedTransactionalPessimisticCacheGetsDistributionTest
extends PartitionedTransactionalOptimisticCacheGetsDistributionTest {
/** {@inheritDoc} */
@Override protected TransactionConcurrency transactionConcurrency() {
return PESSIMISTIC;
}
}
Loading