Skip to content

Commit

Permalink
Simplify transactional map implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Dec 6, 2017
1 parent d7a829f commit 351e9e9
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 146 deletions.
Expand Up @@ -15,136 +15,130 @@
*/ */
package io.atomix.transaction.impl; package io.atomix.transaction.impl;


import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.map.AsyncConsistentMap; import io.atomix.map.AsyncConsistentMap;
import io.atomix.map.impl.MapUpdate; import io.atomix.map.impl.MapUpdate;
import io.atomix.map.impl.MapUpdate.Type;
import io.atomix.transaction.TransactionId; import io.atomix.transaction.TransactionId;
import io.atomix.transaction.TransactionLog; import io.atomix.transaction.TransactionLog;
import io.atomix.utils.time.Versioned; import io.atomix.utils.time.Versioned;
import org.apache.commons.lang3.tuple.Pair;


import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;


/** /**
* Default transactional map. * Default transactional map.
*/ */
public class ReadCommittedTransactionalMap<K, V> extends TransactionalMapParticipant<K, V> { public class ReadCommittedTransactionalMap<K, V> extends TransactionalMapParticipant<K, V> {
private final Map<K, V> writeCache = Maps.newConcurrentMap(); private final Map<K, MapUpdate<K, V>> updates = Maps.newConcurrentMap();
private final Set<K> deleteSet = Sets.newConcurrentHashSet();


public ReadCommittedTransactionalMap(TransactionId transactionId, AsyncConsistentMap<K, V> consistentMap) { public ReadCommittedTransactionalMap(TransactionId transactionId, AsyncConsistentMap<K, V> consistentMap) {
super(transactionId, consistentMap); super(transactionId, consistentMap);
} }


@Override @Override
public CompletableFuture<V> get(K key) { public CompletableFuture<V> get(K key) {
if (deleteSet.contains(key)) { return consistentMap.get(key).thenApply(Versioned::valueOrNull);
return CompletableFuture.completedFuture(null);
}
V latest = writeCache.get(key);
if (latest != null) {
return CompletableFuture.completedFuture(latest);
} else {
return consistentMap.get(key).thenApply(Versioned::valueOrNull);
}
} }


@Override @Override
public CompletableFuture<Boolean> containsKey(K key) { public CompletableFuture<Boolean> containsKey(K key) {
return get(key).thenApply(Objects::nonNull); return consistentMap.get(key).thenApply(Objects::nonNull);
} }


@Override @Override
public CompletableFuture<V> put(K key, V value) { public CompletableFuture<V> put(K key, V value) {
return get(key) return consistentMap.get(key)
.thenApply(latest -> { .thenApply(versioned -> {
writeCache.put(key, value); if (versioned == null) {
deleteSet.remove(key); updates.put(key, MapUpdate.<K, V>builder()
return latest; .withType(Type.PUT_IF_ABSENT)
.withKey(key)
.withValue(value)
.build());
return null;
} else {
updates.put(key, MapUpdate.<K, V>builder()
.withType(Type.PUT_IF_VERSION_MATCH)
.withKey(key)
.withValue(value)
.withVersion(versioned.version())
.build());
return versioned.value();
}
}); });
} }


@Override @Override
public CompletableFuture<V> putIfAbsent(K key, V value) { public CompletableFuture<V> putIfAbsent(K key, V value) {
return get(key) return consistentMap.get(key)
.thenCompose(latest -> { .thenApply(versioned -> {
if (latest == null) { if (versioned == null) {
return put(key, value); updates.put(key, MapUpdate.<K, V>builder()
.withType(Type.PUT_IF_ABSENT)
.withKey(key)
.withValue(value)
.build());
return null;
} else {
return versioned.value();
} }
return CompletableFuture.completedFuture(latest);
}); });
} }


@Override @Override
public CompletableFuture<V> remove(K key) { public CompletableFuture<V> remove(K key) {
return get(key) return consistentMap.get(key)
.thenApply(latest -> { .thenApply(versioned -> {
if (latest != null) { if (versioned != null) {
writeCache.remove(key); updates.put(key, MapUpdate.<K, V>builder()
deleteSet.add(key); .withType(Type.REMOVE_IF_VERSION_MATCH)
.withKey(key)
.withVersion(versioned.version())
.build());
return versioned.value();
} }
return latest; return null;
}); });
} }


@Override @Override
public CompletableFuture<Boolean> remove(K key, V value) { public CompletableFuture<Boolean> remove(K key, V value) {
return get(key) return consistentMap.get(key)
.thenCompose(latest -> { .thenApply(versioned -> {
if (Objects.equals(value, latest)) { if (versioned != null && Objects.equals(versioned.value(), value)) {
return remove(key).thenApply(v -> true); updates.put(key, MapUpdate.<K, V>builder()
.withType(Type.REMOVE_IF_VERSION_MATCH)
.withKey(key)
.withVersion(versioned.version())
.build());
return true;
} }
return CompletableFuture.completedFuture(false); return false;
}); });
} }


@Override @Override
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) { public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
return get(key) return consistentMap.get(key)
.thenCompose(latest -> { .thenApply(versioned -> {
if (Objects.equals(oldValue, latest)) { if (versioned != null && Objects.equals(versioned.value(), oldValue)) {
return put(key, newValue).thenApply(v -> true); updates.put(key, MapUpdate.<K, V>builder()
.withType(Type.PUT_IF_VERSION_MATCH)
.withKey(key)
.withValue(newValue)
.withVersion(versioned.version())
.build());
return true;
} }
return CompletableFuture.completedFuture(false); return false;
}); });
} }


@Override @Override
public TransactionLog<MapUpdate<K, V>> log() { public TransactionLog<MapUpdate<K, V>> log() {
return new TransactionLog<>(transactionId, 0, Stream.concat( return new TransactionLog<>(transactionId, 0, Lists.newArrayList(updates.values()));
// 1st stream: delete ops
deleteSet.stream()
.map(key -> Pair.of(key, consistentMap.get(key).join()))
.filter(e -> e.getValue() != null)
.map(e -> MapUpdate.<K, V>builder()
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(e.getKey())
.withVersion(e.getValue().version())
.build()),
// 2nd stream: write ops
writeCache.entrySet().stream()
.map(e -> {
Versioned<V> original = consistentMap.get(e.getKey()).join();
if (original == null) {
return MapUpdate.<K, V>builder()
.withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey(e.getKey())
.withValue(e.getValue())
.build();
} else {
return MapUpdate.<K, V>builder()
.withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.withKey(e.getKey())
.withVersion(original.version())
.withValue(e.getValue())
.build();
}
})).collect(Collectors.toList()));
} }
} }

0 comments on commit 351e9e9

Please sign in to comment.