Skip to content

Commit

Permalink
Merge pull request #1885 from gurbuzali/multimap-collection-sharing/3.x
Browse files Browse the repository at this point in the history
Fixes #1882 sharing of non-thread-safe collection prevented for local ca...
  • Loading branch information
noctarius committed Feb 28, 2014
2 parents 9162311 + a03d51b commit 9879276
Show file tree
Hide file tree
Showing 18 changed files with 96 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ public MultiMapWrapper getMultiMapWrapper(Data dataKey) {
return multiMapWrappers.get(dataKey);
}

public Collection<MultiMapRecord> remove(Data dataKey) {
public void delete(Data dataKey) {
multiMapWrappers.remove(dataKey);
}

public Collection<MultiMapRecord> remove(Data dataKey, boolean copyOf) {
MultiMapWrapper wrapper = multiMapWrappers.remove(dataKey);
return wrapper != null ? wrapper.getCollection() : null;
return wrapper != null ? wrapper.getCollection(copyOf) : null;
}

public Set<Data> keySet() {
Expand All @@ -136,7 +140,7 @@ public Set<Data> keySet() {
public Collection<MultiMapRecord> values() {
Collection<MultiMapRecord> valueCollection = new LinkedList<MultiMapRecord>();
for (MultiMapWrapper wrapper : multiMapWrappers.values()) {
valueCollection.addAll(wrapper.getCollection());
valueCollection.addAll(wrapper.getCollection(false));
}
return valueCollection;
}
Expand All @@ -151,7 +155,7 @@ public boolean containsEntry(boolean binary, Data key, Data value) {
return false;
}
MultiMapRecord record = new MultiMapRecord(binary ? value : nodeEngine.toObject(value));
return wrapper.getCollection().contains(record);
return wrapper.getCollection(false).contains(record);
}

public boolean containsValue(boolean binary, Data value) {
Expand All @@ -167,22 +171,16 @@ public Map<Data, Collection<MultiMapRecord>> copyCollections() {
Map<Data, Collection<MultiMapRecord>> map = new HashMap<Data, Collection<MultiMapRecord>>(multiMapWrappers.size());
for (Map.Entry<Data, MultiMapWrapper> entry : multiMapWrappers.entrySet()) {
Data key = entry.getKey();
Collection<MultiMapRecord> col = copyCollection(entry.getValue().getCollection());
Collection<MultiMapRecord> col = entry.getValue().getCollection(true);
map.put(key, col);
}
return map;
}

private Collection<MultiMapRecord> copyCollection(Collection<MultiMapRecord> coll) {
Collection<MultiMapRecord> copy = new ArrayList<MultiMapRecord>(coll.size());
copy.addAll(coll);
return copy;
}

public int size() {
int size = 0;
for (MultiMapWrapper wrapper : multiMapWrappers.values()) {
size += wrapper.getCollection().size();
size += wrapper.getCollection(false).size();
}
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected void writeInternal(ObjectDataOutput out) throws IOException {
Data key = collectionEntry.getKey();
key.writeData(out);
MultiMapWrapper wrapper = collectionEntry.getValue();
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
out.writeInt(coll.size());
for (MultiMapRecord record : coll) {
record.writeData(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ public void insertMigratedData(int partitionId, Map<String, Map> map) {
for (Map.Entry<Data, MultiMapWrapper> wrapperEntry : collections.entrySet()) {
final Data key = wrapperEntry.getKey();
final MultiMapWrapper wrapper = wrapperEntry.getValue();
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
if (container.config.getValueCollectionType().equals(MultiMapConfig.ValueCollectionType.SET)){
coll = new HashSet<MultiMapRecord>(wrapper.getCollection());
coll = new HashSet<MultiMapRecord>(coll);
}
container.multiMapWrappers.put(key, new MultiMapWrapper(coll));
}
Expand Down Expand Up @@ -271,7 +271,7 @@ public LocalMapStats createStats(String name) {
lockedEntryCount += multiMapContainer.getLockedCount();
for (MultiMapWrapper wrapper : multiMapContainer.multiMapWrappers.values()) {
hits += wrapper.getHits();
ownedEntryCount += wrapper.getCollection().size();
ownedEntryCount += wrapper.getCollection(false).size();
}
} else {
int backupCount = multiMapContainer.config.getTotalBackupCount();
Expand All @@ -292,7 +292,7 @@ public LocalMapStats createStats(String name) {

if (replicaAddress != null && replicaAddress.equals(thisAddress)) {
for (MultiMapWrapper wrapper : multiMapContainer.multiMapWrappers.values()) {
backupEntryCount += wrapper.getCollection().size();
backupEntryCount += wrapper.getCollection(false).size();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.hazelcast.multimap;

import java.util.Collection;
import java.util.*;

/**
* @author ali 3/1/13
Expand All @@ -33,10 +33,22 @@ public MultiMapWrapper(Collection<MultiMapRecord> collection) {
this.collection = collection;
}

public Collection<MultiMapRecord> getCollection() {
public Collection<MultiMapRecord> getCollection(boolean copyOf) {
if (copyOf) {
return getCopyOfCollection();
}
return collection;
}

private Collection<MultiMapRecord> getCopyOfCollection(){
if (collection instanceof Set) {
return new HashSet<MultiMapRecord>(collection);
} else if (collection instanceof List) {
return new LinkedList<MultiMapRecord>(collection);
}
throw new IllegalArgumentException("No Matching CollectionProxyType!");
}

public void incrementHit(){
hits++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void run() throws Exception {
MultiMapContainer container = getOrCreateContainer();
((MultiMapService) getService()).getLocalMultiMapStatsImpl(name).incrementOtherOperations();
MultiMapWrapper wrapper = container.getMultiMapWrapper(dataKey);
response = wrapper == null ? 0 : wrapper.getCollection().size();
response = wrapper == null ? 0 : wrapper.getCollection(false).size();
}

public int getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.hazelcast.multimap.MultiMapDataSerializerHook;
import com.hazelcast.multimap.MultiMapWrapper;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.util.Clock;
import com.hazelcast.spi.ResponseHandler;

import java.util.Collection;

Expand All @@ -28,8 +28,6 @@
*/
public class GetAllOperation extends MultiMapKeyBasedOperation {

transient long begin = -1;

public GetAllOperation() {
}

Expand All @@ -42,9 +40,9 @@ public void run() throws Exception {
Collection coll = null;
if (wrapper != null) {
wrapper.incrementHit();
coll = wrapper.getCollection();
final ResponseHandler responseHandler = getResponseHandler();
coll = wrapper.getCollection(responseHandler.isLocal());
}
begin = Clock.currentTimeMillis();
response = new MultiMapResponse(coll);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ public final MultiMapWrapper getCollectionWrapper() {
return getOrCreateContainer().getMultiMapWrapper(dataKey);
}

public final Collection<MultiMapRecord> remove() {
return getOrCreateContainer().remove(dataKey);
public final Collection<MultiMapRecord> remove(boolean copyOf) {
return getOrCreateContainer().remove(dataKey, copyOf);
}

public final void delete() {
getOrCreateContainer().delete(dataKey);
}

protected void writeInternal(ObjectDataOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public PutBackupOperation(String name, Data dataKey, Data value, long recordId,
public void run() throws Exception {

MultiMapRecord record = new MultiMapRecord(recordId, isBinary() ? value : toObject(value));
Collection<MultiMapRecord> coll = getOrCreateCollectionWrapper().getCollection();
Collection<MultiMapRecord> coll = getOrCreateCollectionWrapper().getCollection(false);
if (index == -1) {
response = coll.add(record);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void run() throws Exception {
MultiMapContainer container = getOrCreateContainer();
recordId = container.nextId();
MultiMapRecord record = new MultiMapRecord(recordId, isBinary() ? value : toObject(value));
Collection<MultiMapRecord> coll = container.getOrCreateMultiMapWrapper(dataKey).getCollection();
Collection<MultiMapRecord> coll = container.getOrCreateMultiMapWrapper(dataKey).getCollection(false);
if (index == -1) {
response = coll.add(record);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public RemoveAllBackupOperation(String name, Data dataKey) {
}

public void run() throws Exception {
remove();
delete();
response = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.hazelcast.multimap.operations;

import com.hazelcast.core.EntryEventType;
import com.hazelcast.multimap.MultiMapDataSerializerHook;
import com.hazelcast.multimap.MultiMapRecord;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.Operation;
import com.hazelcast.util.Clock;

import java.util.Collection;

Expand All @@ -32,8 +31,6 @@ public class RemoveAllOperation extends MultiMapBackupAwareOperation {

transient Collection<MultiMapRecord> coll;

transient long begin = -1;

public RemoveAllOperation() {
}

Expand All @@ -42,13 +39,11 @@ public RemoveAllOperation(String name, Data dataKey, int threadId) {
}

public void run() throws Exception {
begin = Clock.currentTimeMillis();
coll = remove();
coll = remove(getResponseHandler().isLocal());
response = new MultiMapResponse(coll);
}

public void afterRun() throws Exception {
long elapsed = Math.max(0, Clock.currentTimeMillis() - begin);
if (coll != null) {
getOrCreateContainer().update();
for (MultiMapRecord record : coll) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public void run() throws Exception {
if (wrapper == null) {
return;
}
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
Iterator<MultiMapRecord> iter = coll.iterator();
while (iter.hasNext()){
if(iter.next().getRecordId() == recordId){
iter.remove();
response = true;
if (coll.isEmpty()) {
remove();
delete();
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.hazelcast.multimap.operations;

import com.hazelcast.multimap.*;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.multimap.MultiMapDataSerializerHook;
import com.hazelcast.multimap.MultiMapRecord;
import com.hazelcast.multimap.MultiMapWrapper;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.Operation;
import com.hazelcast.util.Clock;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -37,7 +38,6 @@ public class RemoveOperation extends MultiMapBackupAwareOperation {
Data value;

transient long recordId;
transient long begin = -1;

public RemoveOperation() {
}
Expand All @@ -48,13 +48,12 @@ public RemoveOperation(String name, Data dataKey, int threadId, Data value) {
}

public void run() throws Exception {
begin = Clock.currentTimeMillis();
response = false;
MultiMapWrapper wrapper = getCollectionWrapper();
if (wrapper == null) {
return;
}
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
MultiMapRecord record = new MultiMapRecord(isBinary() ? value : toObject(value));
Iterator<MultiMapRecord> iter = coll.iterator();
while (iter.hasNext()) {
Expand All @@ -64,7 +63,7 @@ public void run() throws Exception {
recordId = r.getRecordId();
response = true;
if (coll.isEmpty()) {
remove();
delete();
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public void run() throws Exception {
}
MultiMapWrapper wrapper = getOrCreateCollectionWrapper();

response = new MultiMapResponse(wrapper.getCollection()).setNextRecordId(container.nextId()).setTxVersion(wrapper.incrementAndGetVersion());
final boolean isLocal = getResponseHandler().isLocal();
final MultiMapResponse multiMapResponse = new MultiMapResponse(wrapper.getCollection(isLocal));
multiMapResponse.setNextRecordId(container.nextId());
multiMapResponse.setTxVersion(wrapper.incrementAndGetVersion());
response = multiMapResponse;
}

public WaitNotifyKey getWaitKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void run() throws Exception {
response = false;
return;
}
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
MultiMapRecord record = new MultiMapRecord(recordId, isBinary() ? value : toObject(value));
coll.add(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void run() throws Exception {
return;
}
}
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
removed = new LinkedList<MultiMapRecord>();
for (Long recordId: recordIds){
Iterator<MultiMapRecord> iter = coll.iterator();
Expand All @@ -75,7 +75,7 @@ public void run() throws Exception {
}
}
if (coll.isEmpty()) {
remove();
delete();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() throws Exception {
response = false;
return;
}
Collection<MultiMapRecord> coll = wrapper.getCollection();
Collection<MultiMapRecord> coll = wrapper.getCollection(false);
Iterator<MultiMapRecord> iter = coll.iterator();
while (iter.hasNext()){
if (iter.next().getRecordId() == recordId){
Expand All @@ -64,7 +64,7 @@ public void run() throws Exception {
}
}
if (coll.isEmpty()) {
remove();
delete();
}
}

Expand Down
Loading

0 comments on commit 9879276

Please sign in to comment.