Skip to content

Commit

Permalink
makes write-coalescing configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Jan 19, 2015
1 parent 7232111 commit 14597aa
Show file tree
Hide file tree
Showing 18 changed files with 467 additions and 108 deletions.
2 changes: 1 addition & 1 deletion hazelcast-documentation/src/Map-Persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ If `MapStore` throws an exception, then the exception will be propagated back to

`MapStore` can be configured as write-behind by setting the `write-delay-seconds` property to a value bigger than **0**. This means the modified entries will be put to the data store asynchronously after a configured delay.

![image](images/NoteSmall.jpg) ***NOTE:*** *In write-behind mode, Hazelcast coalesces updates on a specific key, i.e. applies only the last update on it.*
![image](images/NoteSmall.jpg) ***NOTE:*** *In write-behind mode, by default Hazelcast coalesces updates on a specific key, i.e. applies only the last update on it. But you can also set `MapStoreConfig#setWriteCoalescing` to `FALSE` and you can store all updates on a key to the store.*

In this mode, when the `map.put(key,value)` call returns, you can be sure that

Expand Down
13 changes: 9 additions & 4 deletions hazelcast-spring/src/main/resources/hazelcast-spring-3.3.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
type="xs:string"/>
<xs:attribute name="write-delay-seconds" use="required" type="xs:string"/>
<xs:attribute name="write-batch-size" use="optional" type="xs:string"/>
<xs:attribute name="write-coalescing" use="optional" type="xs:boolean"/>
<xs:attribute name="initial-mode">
<xs:simpleType>
<xs:restriction base="non-space-string">
Expand Down Expand Up @@ -253,11 +254,15 @@
</xs:element>
</xs:sequence>
<xs:attribute name="name" type="xs:string" use="optional" default="default"/>
<xs:attribute name="in-memory-format" type="in-memory-format" use="optional" default="OBJECT"/>
<xs:attribute name="concurrency-level" type="concurrency-level" use="optional" default="32"/>
<xs:attribute name="replication-delay-millis" type="xs:unsignedInt" use="optional" default="100"/>
<xs:attribute name="in-memory-format" type="in-memory-format" use="optional"
default="OBJECT"/>
<xs:attribute name="concurrency-level" type="concurrency-level" use="optional"
default="32"/>
<xs:attribute name="replication-delay-millis" type="xs:unsignedInt" use="optional"
default="100"/>
<xs:attribute name="async-fillup" type="xs:boolean" use="optional" default="true"/>
<xs:attribute name="statistics-enabled" type="xs:boolean" use="optional" default="true"/>
<xs:attribute name="statistics-enabled" type="xs:boolean" use="optional"
default="true"/>
</xs:complexType>
</xs:element>
<xs:element name="listeners" type="listeners" minOccurs="0" maxOccurs="1"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import com.hazelcast.spring.serialization.DummyPortableFactory;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.wan.WanReplicationEndpoint;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -84,7 +83,6 @@
import org.springframework.test.context.ContextConfiguration;

import javax.annotation.Resource;

import java.net.InetSocketAddress;
import java.nio.ByteOrder;
import java.util.Collection;
Expand All @@ -95,7 +93,12 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(CustomSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"fullcacheconfig-applicationContext-hazelcast.xml"})
Expand Down Expand Up @@ -218,7 +221,8 @@ public void testMapConfig() {
assertTrue(testMapStoreConfig.isEnabled());
assertEquals(0, testMapStoreConfig.getWriteDelaySeconds());
assertEquals(10, testMapStoreConfig.getWriteBatchSize());
assertEquals(MapStoreConfig.InitialLoadMode.EAGER,testMapStoreConfig.getInitialLoadMode());
assertTrue(testMapStoreConfig.isWriteCoalescing());
assertEquals(MapStoreConfig.InitialLoadMode.EAGER, testMapStoreConfig.getInitialLoadMode());

// Test that the testMapConfig has a nearCacheConfig and it is correct
NearCacheConfig testNearCacheConfig = testMapConfig.getNearCacheConfig();
Expand Down Expand Up @@ -526,7 +530,7 @@ public void testManagementCenterConfig() {
assertEquals("myserver:80", managementCenterConfig.getUrl());
assertEquals(4, managementCenterConfig.getUpdateInterval());
}

@Test
public void testMemberAttributesConfig() {
MemberAttributeConfig memberAttributeConfig = config.getMemberAttributeConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
merge-policy="PUT_IF_ABSENT"
in-memory-format="BINARY">
<hz:map-store enabled="true" class-name="com.hazelcast.spring.DummyStore" write-delay-seconds="0"
initial-mode="EAGER" write-batch-size="10"/>
initial-mode="EAGER" write-batch-size="10" write-coalescing="true"/>
<hz:near-cache time-to-live-seconds="0" max-idle-seconds="60" eviction-policy="LRU" max-size="5000"
invalidate-on-change="true"/>

Expand Down
34 changes: 34 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/config/MapStoreConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ public class MapStoreConfig {
* Default batch size for writing
*/
public static final int DEFAULT_WRITE_BATCH_SIZE = 1;
/**
* Default write coalescing behavior
*/
public static final boolean DEFAULT_WRITE_COALESCING = true;

private boolean enabled = true;
private boolean writeCoalescing = DEFAULT_WRITE_COALESCING;
private String className;
private String factoryClassName;
private int writeDelaySeconds = DEFAULT_WRITE_DELAY_SECONDS;
Expand Down Expand Up @@ -71,6 +76,7 @@ public MapStoreConfig(MapStoreConfig config) {
writeDelaySeconds = config.getWriteDelaySeconds();
writeBatchSize = config.getWriteBatchSize();
initialLoadMode = config.getInitialLoadMode();
writeCoalescing = config.isWriteCoalescing();
properties.putAll(config.getProperties());
}

Expand Down Expand Up @@ -264,6 +270,33 @@ public MapStoreConfig setInitialLoadMode(InitialLoadMode initialLoadMode) {
return this;
}


/**
* Returns {@code true} if write-coalescing is enabled.
*
* @return {@code true} if coalescing enabled, {@code false} otherwise.
* @see #setWriteCoalescing(boolean)
*/
public boolean isWriteCoalescing() {
return writeCoalescing;
}

/**
* Setting {@link #writeCoalescing} is meaningful if you are using write-behind {@link com.hazelcast.core.MapStore}.
* <p/>
* When {@link #writeCoalescing} is {@code true}, only the latest store operation on a key in the {@link #writeDelaySeconds}
* time-window will be reflected to {@link com.hazelcast.core.MapStore}.
* <p/>
* Default value is {@value #DEFAULT_WRITE_COALESCING}.
*
* @param writeCoalescing {@code true} to enable write-coalescing, {@code false} otherwise.
* @see com.hazelcast.instance.GroupProperties.GroupProperty#MAP_WRITE_BEHIND_QUEUE_CAPACITY
*/
public void setWriteCoalescing(boolean writeCoalescing) {
this.writeCoalescing = writeCoalescing;
}


@Override
public String toString() {
return "MapStoreConfig{"
Expand All @@ -277,6 +310,7 @@ public String toString() {
+ ", properties=" + properties
+ ", readOnly=" + readOnly
+ ", initialLoadMode=" + initialLoadMode
+ ", writeCoalescing=" + writeCoalescing
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Set;

import static com.hazelcast.config.MapStoreConfig.InitialLoadMode;
import static com.hazelcast.util.StringUtil.isNullOrEmpty;
import static com.hazelcast.util.StringUtil.upperCaseInternal;

/**
Expand Down Expand Up @@ -960,6 +961,14 @@ private MapStoreConfig createMapStoreConfig(final org.w3c.dom.Node node) {
} else if ("write-batch-size".equals(nodeName)) {
mapStoreConfig.setWriteBatchSize(getIntegerValue("write-batch-size", getTextContent(n).trim(),
MapStoreConfig.DEFAULT_WRITE_BATCH_SIZE));
} else if ("write-coalescing".equals(nodeName)) {
final String writeCoalescing = getTextContent(n).trim();
if (isNullOrEmpty(writeCoalescing)) {
mapStoreConfig.setWriteCoalescing(MapStoreConfig.DEFAULT_WRITE_COALESCING);
} else {
mapStoreConfig.setWriteCoalescing(checkTrue(writeCoalescing));
}

} else if ("properties".equals(nodeName)) {
fillProperties(n, mapStoreConfig.getProperties());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ public class GroupProperties {
/**
* This property can be used to verify that Hazelcast nodes only join when their 'application' level configuration is the
* same.
*
* <p/>
* So imagine that you have multiple machines, but you want to make sure that each machine that is going to join the cluster
* has exactly the same 'application level' settings, so settings that are not part of the Hazelcast configuration, but
* maybe some filepath. To prevent these machines, with potential different application level configuration, to form
* a cluster, this property can be set.
*
* <p/>
* You could use actual values, e.g. string paths, but you can also use e.g. an md5 hash. We'll give the give the guarantee
* that only nodes are going to form a cluster where the token is an exact match. If this token is different, the member
* can't be started and therefor you will get the guarantee that all members in the cluster, will have exactly the same
* application validation token.
*
* <p/>
* This validation-token will be checked before member join the cluster.
*/
public static final String PROP_APPLICATION_VALIDATION_TOKEN = "hazelcast.application.validation.token";
Expand Down Expand Up @@ -263,6 +263,9 @@ public class GroupProperties {

public final GroupProperty ENTERPRISE_LICENSE_KEY;

/**
* Per node max write-behind queue capacity. Total of all configured write-behind queue capacities.
*/
public final GroupProperty MAP_WRITE_BEHIND_QUEUE_CAPACITY;

public final GroupProperty ENTERPRISE_WAN_REP_QUEUESIZE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hazelcast.map.mapstore;

import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.map.MapContainer;
import com.hazelcast.map.MapServiceContext;
import com.hazelcast.map.MapStoreWrapper;
Expand Down Expand Up @@ -39,10 +40,10 @@ public static <K, V> MapDataStore<K, V> createWriteBehindStore(MapContainer mapC
final MapServiceContext mapServiceContext = mapContainer.getMapServiceContext();
final MapStoreWrapper store = mapContainer.getStore();
final SerializationService serializationService = mapServiceContext.getNodeEngine().getSerializationService();
final int writeDelaySeconds = mapContainer.getMapConfig().getMapStoreConfig().getWriteDelaySeconds();
final MapStoreConfig mapStoreConfig = mapContainer.getMapConfig().getMapStoreConfig();
final int writeDelaySeconds = mapStoreConfig.getWriteDelaySeconds();
final long writeDelayMillis = TimeUnit.SECONDS.toMillis(writeDelaySeconds);
// TODO writeCoalescing should be configurable.
boolean writeCoalescing = true;
final boolean writeCoalescing = mapStoreConfig.isWriteCoalescing();
final WriteBehindStore mapDataStore
= new WriteBehindStore(store, serializationService, writeDelayMillis, partitionId, writeCoalescing);
final WriteBehindQueue writeBehindQueue = pickWriteBehindQueue(mapServiceContext, writeCoalescing);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.map.mapstore.writebehind;

import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.core.MapStore;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.MapContainer;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.SerializationService;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Contains common fuctionallity which is required by a {@link WriteBehindProcessor}
*
* @param <T> the type of object to be placed in the {@link WriteBehindQueue}
*/
abstract class AbstractWriteBehindProcessor<T> implements WriteBehindProcessor<T> {

protected final int writeBatchSize;

protected final boolean writeCoalescing;

protected final ILogger logger;

protected final MapStore mapStore;

private final SerializationService serializationService;

public AbstractWriteBehindProcessor(MapContainer mapContainer) {
this.serializationService = mapContainer.getMapServiceContext().getNodeEngine().getSerializationService();
this.mapStore = mapContainer.getStore();
this.logger = mapContainer.getMapServiceContext().getNodeEngine().getLogger(DefaultWriteBehindProcessor.class);
MapStoreConfig mapStoreConfig = mapContainer.getMapConfig().getMapStoreConfig();
this.writeBatchSize = mapStoreConfig.getWriteBatchSize();
this.writeCoalescing = mapStoreConfig.isWriteCoalescing();
}

protected Object toObject(Object obj) {
return serializationService.toObject(obj);
}

protected Data toData(Object obj) {
return serializationService.toData(obj);
}


protected void sleepSeconds(long secs) {
try {
TimeUnit.SECONDS.sleep(secs);
} catch (InterruptedException e) {
logger.warning(e);
}
}

/**
* Used to group store operations.
*/
protected enum StoreOperationType {

DELETE {
@Override
boolean processSingle(Object key, Object value, MapStore mapStore) {
mapStore.delete(key);
return true;
}

@Override
boolean processBatch(Map map, MapStore mapStore) {
mapStore.deleteAll(map.keySet());
return true;
}
},

WRITE {
@Override
boolean processSingle(Object key, Object value, MapStore mapStore) {
mapStore.store(key, value);
return true;
}

@Override
boolean processBatch(Map map, MapStore mapStore) {
mapStore.storeAll(map);
return true;
}
};

abstract boolean processSingle(Object key, Object value, MapStore mapStore);

abstract boolean processBatch(Map map, MapStore mapStore);
}
}
Loading

0 comments on commit 14597aa

Please sign in to comment.