Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed keyStateMarker problems #9382

Merged
merged 1 commit into from Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -309,18 +309,30 @@ protected List<MapGetAllCodec.ResponseParameters> getAllInternal(Map<Integer, Li
Data key = entry.getKey();
Data value = entry.getValue();

Boolean marked = markers.get(key);
Boolean marked = markers.remove(key);
if ((null != marked && marked)) {
tryToPutNearCache(key, value);
} else if (!invalidateOnChange) {
nearCache.put(key, value);
}

}
}

unmarkRemainingMarkedKeys(markers);

return responses;
}

private void unmarkRemainingMarkedKeys(Map<Data, Boolean> markers) {
for (Map.Entry<Data, Boolean> entry : markers.entrySet()) {
Boolean marked = entry.getValue();
if (marked) {
keyStateMarker.forceUnmark(entry.getKey());
}
}
}


@Override
public LocalMapStats getLocalMapStats() {
LocalMapStats localMapStats = super.getLocalMapStats();
Expand Down
@@ -0,0 +1,249 @@
/*
* Copyright (c) 2008-2016, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.map.impl.nearcache;

import com.hazelcast.cache.impl.nearcache.NearCache;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.proxy.NearCachedClientMapProxy;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.NearCacheConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.map.impl.nearcache.KeyStateMarker;
import com.hazelcast.map.impl.nearcache.KeyStateMarkerImpl;
import com.hazelcast.map.impl.nearcache.StaleReadPreventerNearCacheWrapper;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongArray;

import static com.hazelcast.util.RandomPicker.getInt;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;

@RunWith(HazelcastSerialClassRunner.class)
@Category(NightlyTest.class)
public class ClientMapKeyStateMarkerStressTest extends HazelcastTestSupport {

private final int KEY_SPACE = 10000;
private final int TEST_RUN_SECONDS = 60;
private final int GET_ALL_THREAD_COUNT = 3;
private final int GET_THREAD_COUNT = 2;
private final int PUT_THREAD_COUNT = 1;
private final int CLEAR_THREAD_COUNT = 1;
private final int REMOVE_THREAD_COUNT = 1;
private final String mapName = "test";
private final AtomicBoolean stop = new AtomicBoolean();

private TestHazelcastFactory factory;

@Before
public void setUp() throws Exception {
factory = new TestHazelcastFactory();
stop.set(false);
}

@After
public void tearDown() throws Exception {
factory.shutdownAll();
}

@Test
public void final_state_of_all_slots_are_unmarked() throws Exception {
HazelcastInstance member = factory.newHazelcastInstance();
factory.newHazelcastInstance();
factory.newHazelcastInstance();

ClientConfig clientConfig = new ClientConfig();
clientConfig.addNearCacheConfig(newNearCacheConfig());
HazelcastInstance client = factory.newHazelcastClient(clientConfig);

IMap memberMap = member.getMap(mapName);
// initial population of imap from member
for (int i = 0; i < KEY_SPACE; i++) {
memberMap.put(i, i);
}

List<Thread> threads = new ArrayList<Thread>();

// member
for (int i = 0; i < PUT_THREAD_COUNT; i++) {
Put put = new Put(memberMap);
threads.add(put);
}

// client
IMap clientMap = client.getMap(mapName);

for (int i = 0; i < GET_ALL_THREAD_COUNT; i++) {
GetAll getAll = new GetAll(clientMap);
threads.add(getAll);
}

for (int i = 0; i < GET_THREAD_COUNT; i++) {
Get get = new Get(clientMap);
threads.add(get);
}

for (int i = 0; i < REMOVE_THREAD_COUNT; i++) {
Remove remove = new Remove(clientMap);
threads.add(remove);
}

for (int i = 0; i < CLEAR_THREAD_COUNT; i++) {
Clear clear = new Clear(clientMap);
threads.add(clear);
}

// start threads
for (Thread thread : threads) {
thread.start();
}

// stress for a while
sleepSeconds(TEST_RUN_SECONDS);

// stop threads
stop.set(true);
for (Thread thread : threads) {
thread.join();
}

assertAllKeysInUnmarkedState(clientMap);
}

private void assertAllKeysInUnmarkedState(IMap clientMap) {
NearCachedClientMapProxy proxy = (NearCachedClientMapProxy) clientMap;
NearCache nearCache = proxy.getNearCache();
KeyStateMarker keyStateMarker = ((StaleReadPreventerNearCacheWrapper) nearCache).getKeyStateMarker();
AtomicLongArray marks = ((KeyStateMarkerImpl) keyStateMarker).getMarks();

String msg = format("nearCacheSize=%d, markerStates=(%s)", nearCache.size(), keyStateMarker);

for (int i = 0; i < marks.length(); i++) {
assertEquals(msg, 0, marks.get(i));
}
}

private class Put extends Thread {
private final IMap map;

private Put(IMap map) {
this.map = map;
}

@Override
public void run() {
do {
for (int i = 0; i < KEY_SPACE; i++) {
map.put(i, getInt(KEY_SPACE));
}
sleepAtLeastMillis(100);
} while (!stop.get());
}
}

private class Remove extends Thread {
private final IMap map;

private Remove(IMap map) {
this.map = map;
}

@Override
public void run() {
do {
for (int i = 0; i < KEY_SPACE; i++) {
map.remove(i);
}
sleepAtLeastMillis(100);
} while (!stop.get());
}
}

private class Clear extends Thread {
private final IMap map;

private Clear(IMap map) {
this.map = map;
}

@Override
public void run() {
do {
map.clear();
sleepAtLeastMillis(3000);
} while (!stop.get());
}
}

private class GetAll extends Thread {
private final IMap map;

private GetAll(IMap map) {
this.map = map;
}

@Override
public void run() {
HashSet keys = new HashSet();
for (int i = 0; i < KEY_SPACE; i++) {
keys.add(i);
}

do {
map.getAll(keys);
sleepAtLeastMillis(2);
} while (!stop.get());
}
}

private class Get extends Thread {
private final IMap map;

private Get(IMap map) {
this.map = map;
}

@Override
public void run() {
do {
for (int i = 0; i < KEY_SPACE; i++) {
map.get(i);
}
} while (!stop.get());
}
}


protected NearCacheConfig newNearCacheConfig() {
NearCacheConfig nearCacheConfig = new NearCacheConfig();
nearCacheConfig.setName(mapName);
nearCacheConfig.setInvalidateOnChange(true);
return nearCacheConfig;
}
}
Expand Up @@ -28,7 +28,7 @@
public class KeyStateMarkerImpl implements KeyStateMarker {

private final int markCount;
private volatile AtomicLongArray marks;
private final AtomicLongArray marks;

public KeyStateMarkerImpl(int markCount) {
this.markCount = markCount;
Expand Down Expand Up @@ -58,7 +58,10 @@ public void forceUnmark(Object key) {

@Override
public void init() {
marks = new AtomicLongArray(markCount);
int slot = 0;
do {
marks.set(slot, UNMARKED.getState());
} while (++slot < marks.length());
}

private boolean casState(Object key, STATE expect, STATE update) {
Expand All @@ -70,4 +73,17 @@ private int getSlot(Object key) {
int hash = key instanceof Data ? ((Data) key).getPartitionHash() : key.hashCode();
return hashToIndex(hash, markCount);
}

// only used for testing purposes.
public AtomicLongArray getMarks() {
return marks;
}

@Override
public String toString() {
return "KeyStateMarkerImpl{"
+ "markCount=" + markCount
+ ", marks=" + marks
+ '}';
}
}
Expand Up @@ -100,4 +100,5 @@ public int size() {
public KeyStateMarker getKeyStateMarker() {
return keyStateMarker;
}

}
Expand Up @@ -300,11 +300,22 @@ protected void getAllObjectInternal(List<Data> keys, List<Object> resultingKeyVa
Data key = toData(resultingKeyValuePairs.get(i++));
Data value = toData(resultingKeyValuePairs.get(i++));

boolean marked = keyStates.get(key);
boolean marked = keyStates.remove(key);
if (marked) {
tryToPutNearCache(key, value);
}
}

unmarkRemainingMarkedKeys(keyStates);
}

private void unmarkRemainingMarkedKeys(Map<Data, Boolean> markers) {
for (Map.Entry<Data, Boolean> entry : markers.entrySet()) {
Boolean marked = entry.getValue();
if (marked) {
keyStateMarker.forceUnmark(entry.getKey());
}
}
}

@Override
Expand Down