Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMap entries eviction support on the indexes level (#11748) #15136

12 changes: 12 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/map/impl/LazyMapEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.query.Metadata;
import com.hazelcast.query.impl.CachedQueryEntry;
import com.hazelcast.query.impl.getters.Extractors;

Expand Down Expand Up @@ -52,6 +53,8 @@ public class LazyMapEntry<K, V> extends CachedQueryEntry<K, V> implements Serial

private transient boolean modified;

private transient Metadata metadata;

public LazyMapEntry() {
}

Expand Down Expand Up @@ -152,4 +155,13 @@ public int getFactoryId() {
public int getClassId() {
return MapDataSerializerHook.LAZY_MAP_ENTRY;
}

public Metadata getMetadata() {
return metadata;
}

public void setMetadata(Metadata metadata) {
this.metadata = metadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.Records;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.impl.recordstore.RecordStoreAdapter;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.query.impl.Index;
import com.hazelcast.query.impl.Indexes;
Expand Down Expand Up @@ -249,9 +250,10 @@ private void populateIndexes(PartitionMigrationEvent event, TargetIndexes target
final PartitionContainer container = mapServiceContext.getPartitionContainer(event.getPartitionId());
for (RecordStore recordStore : container.getMaps().values()) {
final MapContainer mapContainer = mapServiceContext.getMapContainer(recordStore.getName());
final StoreAdapter storeAdapter = new RecordStoreAdapter(recordStore);

final Indexes indexes = mapContainer.getIndexes(event.getPartitionId());
indexes.createIndexesFromRecordedDefinitions();
indexes.createIndexesFromRecordedDefinitions(storeAdapter);
if (!indexes.haveAtLeastOneIndex()) {
// no indexes to work with
continue;
Expand All @@ -273,6 +275,8 @@ private void populateIndexes(PartitionMigrationEvent event, TargetIndexes target
final Object value = Records.getValueOrCachedValue(record, serializationService);
if (value != null) {
QueryableEntry queryEntry = mapContainer.newQueryEntry(key, value);
queryEntry.setRecord(record);
queryEntry.setStoreAdapter(storeAdapter);
indexes.putEntry(queryEntry, null, Index.OperationSource.SYSTEM);
}
}
Expand Down
47 changes: 47 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/map/impl/StoreAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.map.impl;

/**
* Adapter of arbitrary store. This adapter is used to pass record store to the index store.
*/
public interface StoreAdapter<R> {

/**
* Checks whether the record is expired and evicts it if so. Updates the record's last access time
* if it is not evicted.
* @param record the record to check
* @param now the current time
* @param backup {@code true} if a backup partition, otherwise {@code false}.
* @return {@code true} is the record is expired and evicted, otherwise {@code false}.
*/
boolean evictIfExpired(R record, long now, boolean backup);

/**
* Checks whether ttl or maxIdle are set on the record.
* @param record the record to be checked
* @return {@code true} if ttl or maxIdle are defined on the {@code record}, otherwise {@code false}.
*/
boolean isTtlOrMaxIdleDefined(R record);

/**
* @return {@code true} if the store has at least one candidate entry
* for expiration, otherwise {@code false}.
*/
boolean isExpirable();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.Records;
import com.hazelcast.map.impl.recordstore.RecordStoreAdapter;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
Expand Down Expand Up @@ -81,7 +82,8 @@ public void runInternal() {
int partitionId = getPartitionId();

Indexes indexes = mapContainer.getIndexes(partitionId);
InternalIndex index = indexes.addOrGetIndex(attributeName, ordered);
RecordStoreAdapter recordStoreAdapter = new RecordStoreAdapter(recordStore);
InternalIndex index = indexes.addOrGetIndex(attributeName, ordered, indexes.isGlobal() ? null : recordStoreAdapter);
if (index.hasPartitionIndexed(partitionId)) {
return;
}
Expand All @@ -95,6 +97,8 @@ public void runInternal() {
Data key = record.getKey();
Object value = Records.getValueOrCachedValue(record, serializationService);
QueryableEntry queryEntry = mapContainer.newQueryEntry(key, value);
queryEntry.setRecord(record);
queryEntry.setStoreAdapter(recordStoreAdapter);
index.putEntry(queryEntry, null, Index.OperationSource.USER);
}
index.markPartitionAsIndexed(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.hazelcast.map.impl.operation;

import com.hazelcast.config.MapConfig;
import com.hazelcast.map.impl.StoreAdapter;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.RecordReplicationInfo;
import com.hazelcast.map.impl.recordstore.RecordStoreAdapter;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -147,13 +149,15 @@ void applyState() {
RecordStore recordStore = operation.getRecordStore(mapName);
recordStore.reset();
recordStore.setPreMigrationLoadedStatus(loaded.get(mapName));
StoreAdapter storeAdapter = new RecordStoreAdapter(recordStore);

MapContainer mapContainer = recordStore.getMapContainer();
PartitionContainer partitionContainer = recordStore.getMapContainer().getMapServiceContext()
.getPartitionContainer(operation.getPartitionId());
for (Map.Entry<String, Boolean> indexDefinition : mapContainer.getIndexDefinitions().entrySet()) {
Indexes indexes = mapContainer.getIndexes(partitionContainer.getPartitionId());
indexes.addOrGetIndex(indexDefinition.getKey(), indexDefinition.getValue());
indexes.addOrGetIndex(indexDefinition.getKey(), indexDefinition.getValue(),
indexes.isGlobal() ? null : storeAdapter);
}

final Indexes indexes = mapContainer.getIndexes(partitionContainer.getPartitionId());
Expand All @@ -178,6 +182,8 @@ void applyState() {
final Object valueToIndex = getValueOrCachedValue(newRecord, serializationService);
if (valueToIndex != null) {
final QueryableEntry queryableEntry = mapContainer.newQueryEntry(newRecord.getKey(), valueToIndex);
queryableEntry.setRecord(newRecord);
queryableEntry.setStoreAdapter(storeAdapter);
indexes.putEntry(queryableEntry, null, Index.OperationSource.SYSTEM);
}
}
Expand Down Expand Up @@ -216,15 +222,18 @@ private void addIndexes(String mapName, Collection<IndexInfo> indexInfos) {
// creating global indexes on partition thread in case they do not exist
for (IndexInfo indexInfo : indexInfos) {
Indexes indexes = mapContainer.getIndexes();
StoreAdapter recordStoreAdapter = indexes.isGlobal() ? null : new RecordStoreAdapter(recordStore);

// optimisation not to synchronize each partition thread on the addOrGetIndex method
if (indexes.getIndex(indexInfo.getName()) == null) {
indexes.addOrGetIndex(indexInfo.getName(), indexInfo.isOrdered());
indexes.addOrGetIndex(indexInfo.getName(), indexInfo.isOrdered(), recordStoreAdapter);
}
}
} else {
Indexes indexes = mapContainer.getIndexes(operation.getPartitionId());
StoreAdapter recordStoreAdapter = indexes.isGlobal() ? null : new RecordStoreAdapter(recordStore);
for (IndexInfo indexInfo : indexInfos) {
indexes.addOrGetIndex(indexInfo.getName(), indexInfo.isOrdered());
indexes.addOrGetIndex(indexInfo.getName(), indexInfo.isOrdered(), recordStoreAdapter);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.StoreAdapter;
import com.hazelcast.map.impl.iterator.MapEntriesWithCursor;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.Records;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.impl.recordstore.RecordStoreAdapter;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.query.Metadata;
import com.hazelcast.query.PagingPredicate;
Expand Down Expand Up @@ -86,6 +88,7 @@ public void run(String mapName, Predicate predicate, int partitionId, Result res
boolean useCachedValues = isUseCachedDeserializedValuesEnabled(mapContainer, partitionId);
Extractors extractors = mapServiceContext.getExtractors(mapName);
LazyMapEntry queryEntry = new LazyMapEntry();
StoreAdapter storeAdapter = new RecordStoreAdapter(recordStore);
while (iterator.hasNext()) {
Record record = iterator.next();
Data key = (Data) toData(record.getKey());
Expand All @@ -98,6 +101,8 @@ public void run(String mapName, Predicate predicate, int partitionId, Result res

queryEntry.init(serializationService, key, value, extractors);
queryEntry.setMetadata(metadata);
queryEntry.setRecord(record);
queryEntry.setStoreAdapter(storeAdapter);
boolean valid = predicate.apply(queryEntry);
if (valid && compareAnchor(pagingPredicate, queryEntry, nearestAnchorEntry)) {
result.add(queryEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.hazelcast.config.MapIndexConfig;
import com.hazelcast.config.QueryCacheConfig;
import com.hazelcast.core.IMap;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.internal.eviction.EvictionListener;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.map.impl.LazyMapEntry;
Expand All @@ -29,6 +28,7 @@
import com.hazelcast.map.impl.querycache.QueryCacheEventService;
import com.hazelcast.map.impl.querycache.subscriber.record.QueryCacheRecord;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.impl.CachedQueryEntry;
import com.hazelcast.query.impl.Indexes;
Expand Down Expand Up @@ -85,8 +85,9 @@ abstract class AbstractInternalQueryCache<K, V> implements InternalQueryCache<K,
this.recordStore = new DefaultQueryCacheRecordStore(serializationService, indexes,
queryCacheConfig, getEvictionListener(), extractors);

assert indexes.isGlobal();
for (MapIndexConfig indexConfig : queryCacheConfig.getIndexConfigs()) {
indexes.addOrGetIndex(indexConfig.getAttribute(), indexConfig.isOrdered());
indexes.addOrGetIndex(indexConfig.getAttribute(), indexConfig.isOrdered(), null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* @param <K> the key type for this {@link InternalQueryCache}
* @param <V> the value type for this {@link InternalQueryCache}
*/
@SuppressWarnings("checkstyle:methodcount")
@SuppressWarnings({"checkstyle:methodcount", "checkstyle:classfanoutcomplexity"})
class DefaultQueryCache<K, V> extends AbstractInternalQueryCache<K, V> {

DefaultQueryCache(String cacheId, String cacheName, QueryCacheConfig queryCacheConfig,
Expand Down Expand Up @@ -456,7 +456,8 @@ public boolean removeEntryListener(String id) {
public void addIndex(String attribute, boolean ordered) {
checkNotNull(attribute, "attribute cannot be null");

getIndexes().addOrGetIndex(attribute, ordered);
assert indexes.isGlobal();
getIndexes().addOrGetIndex(attribute, ordered, null);

InternalSerializationService serializationService = context.getSerializationService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public Object getCachedValueUnsafe() {
@Override
public void onAccess(long now) {
hits++;
onAccessSafe(now);
}

@Override
public void onAccessSafe(long now) {
lastAccessTime = stripBaseTime(now);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public interface Record<V> {

void onAccess(long now);

/**
* An implementation must be thread safe if the record might be accessed from multiple threads.
*
*/
void onAccessSafe(long now);

void onUpdate(long now);

void onStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,16 @@ protected boolean isMaxIdleDefined(long maxIdle) {
return maxIdle > 0L && maxIdle < Long.MAX_VALUE;
}

/**
* Check if record is reachable according to TTL or idle times.
* If not reachable return null.
*
* @param record {@link com.hazelcast.map.impl.record.Record}
* @return null if evictable.
*/
protected Record getOrNullIfExpired(Record record, long now, boolean backup) {
@Override
public boolean isTtlOrMaxIdleDefined(Record record) {
long ttl = record.getTtl();
long maxIdle = record.getMaxIdle();
return isTtlDefined(ttl) || isMaxIdleDefined(maxIdle);
}


@Override
public Record getOrNullIfExpired(Record record, long now, boolean backup) {
if (!isRecordStoreExpirable()) {
return record;
}
Expand Down Expand Up @@ -332,7 +334,8 @@ private void accumulateOrSendExpiredKey(Record record) {
clearExpiredRecordsTask.tryToSendBackupExpiryOp(this, true);
}

protected void accessRecord(Record record, long now) {
@Override
public void accessRecord(Record record, long now) {
record.onAccess(now);
updateStatsOnGet(now);
setExpirationTime(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.StoreAdapter;
import com.hazelcast.map.impl.mapstore.MapDataStore;
import com.hazelcast.map.impl.mapstore.MapStoreContext;
import com.hazelcast.map.impl.record.Record;
Expand Down Expand Up @@ -65,6 +66,7 @@ abstract class AbstractRecordStore implements RecordStore<Record> {
protected final RecordStoreMutationObserver<Record> mutationObserver;

protected Storage<Data, Record> storage;
private final StoreAdapter storeAdapter;

protected AbstractRecordStore(MapContainer mapContainer, int partitionId) {
this.name = mapContainer.getName();
Expand All @@ -82,6 +84,7 @@ protected AbstractRecordStore(MapContainer mapContainer, int partitionId) {
Collection<RecordStoreMutationObserver<Record>> mutationObservers = mapServiceContext
.createRecordStoreMutationObservers(getName(), partitionId);
this.mutationObserver = new CompositeRecordStoreMutationObserver<>(mutationObservers);
this.storeAdapter = new RecordStoreAdapter(this);
}

protected boolean persistenceEnabledFor(@Nonnull CallerProvenance provenance) {
Expand Down Expand Up @@ -161,7 +164,8 @@ protected void saveIndex(Record record, Object oldValue) {
if (indexes.haveAtLeastOneIndex()) {
Object value = Records.getValueOrCachedValue(record, serializationService);
QueryableEntry queryableEntry = mapContainer.newQueryEntry(dataKey, value);
queryableEntry.setMetadata(record.getMetadata());
queryableEntry.setRecord(record);
queryableEntry.setStoreAdapter(storeAdapter);
indexes.putEntry(queryableEntry, oldValue, Index.OperationSource.USER);
}
}
Expand Down
Loading