Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.0' into 3.0
Browse files Browse the repository at this point in the history
Conflicts:
	hazelcast/src/main/java/com/hazelcast/map/MapMigrationOperation.java
	hazelcast/src/main/java/com/hazelcast/map/MapService.java
	hazelcast/src/main/java/com/hazelcast/map/QueryOperation.java
	hazelcast/src/main/java/com/hazelcast/map/QueryPartitionOperation.java
	hazelcast/src/main/java/com/hazelcast/query/impl/UnsortedIndexStore.java
  • Loading branch information
talip committed Jan 29, 2013
2 parents a745887 + be82734 commit 899ff8d
Show file tree
Hide file tree
Showing 195 changed files with 1,446 additions and 1,889 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2010, Hazel Ltd. All Rights Reserved.
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,13 +12,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.hazelcast.client;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.cluster.ClusterService;
import com.hazelcast.core.*;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.nio.Address;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2010, Hazel Ltd. All Rights Reserved.
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.hazelcast.client.util;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2012, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2010, Hazel Ltd. All Rights Reserved.
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.hazelcast.cluster.client;
Expand All @@ -21,7 +20,6 @@
import com.hazelcast.instance.Node;
import com.hazelcast.map.MapService;
import com.hazelcast.nio.Protocol;
import com.hazelcast.spi.impl.NodeEngineImpl;

public class DestroyHandler extends ClientCommandHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.hazelcast.collection;

import com.hazelcast.config.MultiMapConfig;
import com.hazelcast.map.LockInfo;
import com.hazelcast.lock.LockInfo;
import com.hazelcast.lock.LockStore;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.NodeEngine;
Expand All @@ -42,7 +43,7 @@ public class CollectionContainer {

final ConcurrentMap<Data, Collection<CollectionRecord>> collections = new ConcurrentHashMap<Data, Collection<CollectionRecord>>(1000);

final ConcurrentMap<Data, LockInfo> locks = new ConcurrentHashMap<Data, LockInfo>(100);
final LockStore lockStore = new LockStore();

final AtomicLong idGen = new AtomicLong();

Expand All @@ -54,12 +55,7 @@ public CollectionContainer(CollectionProxyId proxyId, CollectionService service)
}

public LockInfo getOrCreateLock(Data key) {
LockInfo lock = locks.get(key);
if (lock == null) {
lock = new LockInfo();
locks.put(key, lock);
}
return lock;
return lockStore.getOrCreateLock(key);
}

public boolean lock(Data dataKey, Address caller, int threadId, long ttl) {
Expand All @@ -68,31 +64,15 @@ public boolean lock(Data dataKey, Address caller, int threadId, long ttl) {
}

public boolean isLocked(Data dataKey) {
LockInfo lock = locks.get(dataKey);
if (lock == null)
return false;
return lock.isLocked();
return lockStore.isLocked(dataKey);
}

public boolean canAcquireLock(Data key, int threadId, Address caller) {
LockInfo lock = locks.get(key);
return lock == null || lock.testLock(threadId, caller);
return lockStore.canAcquireLock(key, caller, threadId);
}

public boolean unlock(Data dataKey, Address caller, int threadId) {
LockInfo lock = locks.get(dataKey);
boolean result = false;
if (lock == null)
return result;
if (lock.testLock(threadId, caller)) {
if (lock.unlock(caller, threadId)) {
result = true;
}
}
if (!lock.isLocked()) {
locks.remove(dataKey);
}
return result;
return lockStore.unlock(dataKey, caller, threadId);
}

public long nextId() {
Expand Down Expand Up @@ -178,6 +158,7 @@ public int size() {
}

public void clearCollections() {
final Map<Data, LockInfo> locks = lockStore.getLocks();
Map<Data, Collection<CollectionRecord>> temp = new HashMap<Data, Collection<CollectionRecord>>(locks.size());
for (Data key : locks.keySet()) {
temp.put(key, collections.get(key));
Expand All @@ -198,13 +179,13 @@ public ConcurrentMap<Data, Collection<CollectionRecord>> getCollections() {
return collections; //TODO for testing only
}

public ConcurrentMap<Data, LockInfo> getLocks() {
return locks; //TODO for testing only
public Map<Data, LockInfo> getLocks() {
return lockStore.getLocks(); //TODO for testing only
}

public void clear() {
collections.clear();
locks.clear();
lockStore.clear();
}

private CollectionRecord createRecord(boolean binary, Data data) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2012, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package com.hazelcast.collection;

import com.hazelcast.map.LockInfo;
import com.hazelcast.lock.LockInfo;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.hazelcast.collection;

import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConcurrencyUtil.ConstructorFunction;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -28,18 +31,19 @@ public class CollectionPartitionContainer {

final ConcurrentMap<CollectionProxyId, CollectionContainer> containerMap = new ConcurrentHashMap<CollectionProxyId, CollectionContainer>(1000);

private final ConstructorFunction<CollectionProxyId, CollectionContainer> collectionConstructor
= new ConstructorFunction<CollectionProxyId, CollectionContainer>() {
public CollectionContainer createNew(CollectionProxyId proxyId) {
return new CollectionContainer(proxyId, service);
}
};

public CollectionPartitionContainer(CollectionService service) {
this.service = service;
}

public CollectionContainer getOrCreateCollectionContainer(CollectionProxyId proxyId) {
CollectionContainer collectionContainer = containerMap.get(proxyId);
if (collectionContainer == null) {
collectionContainer = new CollectionContainer(proxyId, service);
CollectionContainer current = containerMap.putIfAbsent(proxyId, collectionContainer);
collectionContainer = current == null ? collectionContainer : current;
}
return collectionContainer;
return ConcurrencyUtil.getOrPutIfAbsent(containerMap, proxyId, collectionConstructor);
}

public ConcurrentMap<CollectionProxyId, CollectionContainer> getContainerMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.collection;

import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
Expand All @@ -29,50 +30,69 @@ public class CollectionProxyId implements DataSerializable {

String name;

String keyName;

CollectionProxyType type;

public CollectionProxyId() {
}

public CollectionProxyId(String name, CollectionProxyType type) {
public CollectionProxyId(String name, String keyName, CollectionProxyType type) {
this.name = name;
this.keyName = keyName;
this.type = type;
}

@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(type.getType());
IOUtil.writeNullableString(out, keyName);
}

public void readData(ObjectDataInput in) throws IOException {
name = in.readUTF();
type = CollectionProxyType.getByType(in.readInt());
keyName = IOUtil.readNullableString(in);
}

public String getName() {
return name;
}

public String getKeyName() {
return keyName;
}

public CollectionProxyType getType() {
return type;
}

public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!(o instanceof CollectionProxyId)) return false;

CollectionProxyId that = (CollectionProxyId) o;
CollectionProxyId proxyId = (CollectionProxyId) o;

if (!name.equals(that.name)) return false;
if (type != that.type) return false;
if (keyName != null ? !keyName.equals(proxyId.keyName) : proxyId.keyName != null) return false;
if (!name.equals(proxyId.name)) return false;
if (type != proxyId.type) return false;

return true;
}

@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + (keyName != null ? keyName.hashCode() : 0);
result = 31 * result + type.hashCode();
return result;
}

public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(type.getType());
}

public void readData(ObjectDataInput in) throws IOException {
name = in.readUTF();
type = CollectionProxyType.getByType(in.readInt());
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("CollectionProxyId");
sb.append("{name='").append(name).append('\'');
sb.append(", keyName='").append(keyName).append('\'');
sb.append(", type=").append(type);
sb.append('}');
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2012, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.hazelcast.collection.multimap.ObjectMultiMapProxy;
import com.hazelcast.collection.set.ObjectSetProxy;
import com.hazelcast.core.*;
import com.hazelcast.map.LockInfo;
import com.hazelcast.lock.LockInfo;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.SerializationService;
import com.hazelcast.partition.MigrationEndpoint;
Expand All @@ -34,7 +34,8 @@
/**
* @ali 1/1/13
*/
public class CollectionService implements ManagedService, RemoteService, EventPublishingService<CollectionEvent, EventListener>, MigrationAwareService {
public class CollectionService implements ManagedService, RemoteService, MembershipAwareService,
EventPublishingService<CollectionEvent, EventListener>, MigrationAwareService {

public static final String SERVICE_NAME = "hz:impl:collectionService";

Expand Down Expand Up @@ -75,15 +76,14 @@ public String getServiceName() {

public DistributedObject createDistributedObject(Object objectId) {
CollectionProxyId collectionProxyId = (CollectionProxyId) objectId;
final String name = collectionProxyId.name;
final CollectionProxyType type = collectionProxyId.type;
switch (type) {
case MULTI_MAP:
return new ObjectMultiMapProxy(name, this, nodeEngine, collectionProxyId.type);
return new ObjectMultiMapProxy(this, nodeEngine, collectionProxyId);
case LIST:
return new ObjectListProxy(name, this, nodeEngine, collectionProxyId.type);
return new ObjectListProxy(this, nodeEngine, collectionProxyId);
case SET:
return new ObjectSetProxy(name, this, nodeEngine, collectionProxyId.type);
return new ObjectSetProxy(this, nodeEngine, collectionProxyId);
case QUEUE:
return null;
}
Expand Down Expand Up @@ -175,9 +175,6 @@ public void beforeMigration(MigrationServiceEvent migrationServiceEvent) {
}

public Operation prepareMigrationOperation(MigrationServiceEvent event) {
if (event.getPartitionId() < 0 || event.getPartitionId() >= nodeEngine.getPartitionService().getPartitionCount()) {
return null; // is it possible
}
int replicaIndex = event.getReplicaIndex();
CollectionPartitionContainer partitionContainer = partitionContainers[event.getPartitionId()];
Map<CollectionProxyId, Map[]> map = new HashMap<CollectionProxyId, Map[]>(partitionContainer.containerMap.size());
Expand All @@ -187,7 +184,7 @@ public Operation prepareMigrationOperation(MigrationServiceEvent event) {
if (container.config.getTotalBackupCount() < replicaIndex) {
continue;
}
map.put(proxyId, new Map[]{container.collections, container.locks});
map.put(proxyId, new Map[]{container.collections, container.lockStore.getLocks()});
}
if (map.isEmpty()) {
return null;
Expand All @@ -202,7 +199,9 @@ public void insertMigratedData(int partitionId, Map<CollectionProxyId, Map[]> ma
Map<Data, Collection<CollectionRecord>> collections = entry.getValue()[0];
container.collections.putAll(collections);
Map<Data, LockInfo> locks = entry.getValue()[1];
container.locks.putAll(locks);
for (Map.Entry<Data, LockInfo> lockEntry : locks.entrySet()) {
container.lockStore.putLock(lockEntry.getKey(), lockEntry.getValue());
}
}
}

Expand Down Expand Up @@ -256,4 +255,14 @@ public void destroy() {
}
eventRegistrations.clear();
}

public void memberAdded(MembershipServiceEvent event) {
}

public void memberRemoved(MembershipServiceEvent event) {
// TODO: when a member dies;
// * release locks
// * rollback transaction
// * do not know ?
}
}
Loading

0 comments on commit 899ff8d

Please sign in to comment.