Skip to content

Commit

Permalink
Object Open Race
Browse files Browse the repository at this point in the history
When an object is opened, it is added to the cache right after
its creation, as a consequence other threads can retrieve the
object from the object cache after its creation and before
additional initialization. For example, register index. This
patch moves the extra initialization into an atomic operation
(i.e. computeIfAbsent). As a consequence, the parameters of
the first cached open will be observed by subsequent opens.
  • Loading branch information
Maithem authored and no2chem committed Dec 15, 2017
1 parent dcd2239 commit f0a6a4e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ public CorfuTable(Class<F> indexFunctionEnumClass) {
indexerClass = indexFunctionEnumClass;
indexFunctions.addAll(EnumSet.allOf(indexFunctionEnumClass));
indexFunctions.forEach(f -> indexMap.put(f, new HashMap<>()));
log.info("CorfuTable: creating CorfuTable with {} as indexer class", indexFunctionEnumClass);
}

/** Default constructor. Generates a table without any secondary indexes. */
public CorfuTable() {
log.debug("CorfuTable: Creating a table without secondary indexes! Secondary index lookup"
log.info("CorfuTable: Creating a table without secondary indexes! Secondary index lookup"
+ " will DEGRADE to a full scan");
}

Expand Down Expand Up @@ -286,23 +287,6 @@ public V get(@ConflictParameter Object key) {
.collect(Collectors.toCollection(ArrayList::new));
}


/**
* Register new index class
*
* This replaces the current index.
*
* @param indexFunctionEnumClass
*/
public void registerIndex(Class<F> indexFunctionEnumClass) {
indexerClass = indexFunctionEnumClass;
indexMap.clear();
indexFunctions.clear();
indexFunctions.addAll(EnumSet.allOf(indexFunctionEnumClass));
indexFunctions.forEach(f -> indexMap.put(f, new HashMap<>()));
mainMap.forEach(this::mapSecondaryIndexes);
}

/** {@inheritDoc} */
@Override
@MutatorAccessor(name = "put", undoFunction = "undoPut", undoRecordFunction = "undoPutRecord")
Expand Down
51 changes: 18 additions & 33 deletions runtime/src/main/java/org/corfudb/runtime/view/ObjectBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public <R> ObjectBuilder<R> setType(Class<R> type) {

@SuppressWarnings("unchecked")
public <R> ObjectBuilder<R> setTypeToken(TypeToken<R> typeToken) {
this.type = (Class<T>)typeToken.getRawType();
this.type = (Class<T>) typeToken.getRawType();
return (ObjectBuilder<R>) this;
}

Expand Down Expand Up @@ -103,45 +103,30 @@ public T open() {
arguments, serializer);
} else {
ObjectsView.ObjectID<T> oid = new ObjectsView.ObjectID(streamID, type);
T result = (T) runtime.getObjectsView().objectCache.computeIfAbsent(oid, x -> {
return (T) runtime.getObjectsView().objectCache.computeIfAbsent(oid, x -> {
try {
return CorfuCompileWrapperBuilder.getWrapper(type, runtime,
T result = CorfuCompileWrapperBuilder.getWrapper(type, runtime,
streamID, arguments, serializer);

// Get object serializer to check if we didn't attempt to set another serializer
// to an already existing map
ISerializer objectSerializer = ((CorfuCompileProxy) ((ICorfuSMR) result).
getCorfuSMRProxy())
.getSerializer();

if (serializer != objectSerializer) {
log.warn("open: Attempt to open an existing object with a different serializer {}. " +
"Object {} opened with original serializer {}.",
serializer.getClass().getSimpleName(),
oid,
objectSerializer.getClass().getSimpleName());
}
return result;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
);
// Get object serializer to check if we didn't attempt to set another serializer
// to an already existing map
ISerializer objectSerializer = ((CorfuCompileProxy) ((ICorfuSMR) runtime.getObjectsView().
getObjectCache().
get(oid)).
getCorfuSMRProxy())
.getSerializer();

// FIXME: temporary hack until we have a registry
// If current map in cache has no indexer, or there is currently an other one,
// this will create and compute the indices.
if (result instanceof CorfuTable) {
CorfuTable currentCorfuTable = ((CorfuTable) result);
if (arguments.length > 0) {
// If current map in cache has no indexer, or there is currently an other index
if (!(currentCorfuTable.hasSecondaryIndices()) ||
currentCorfuTable.getIndexerClass() != arguments[0].getClass()){
((CorfuTable) result).registerIndex((Class) arguments[0]);
}
}
}

if (serializer != objectSerializer) {
log.warn("open: Attempt to open an existing object with a different serializer {}. " +
"Object {} opened with original serializer {}.",
serializer.getClass().getSimpleName(),
oid,
objectSerializer.getClass().getSimpleName());
}
return result;
}
} catch (Exception ex) {
log.error("Runtime instrumentation no longer supported and no compiled class found"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,32 @@ enum OtherStringIndexer implements CorfuTable.IndexSpecification<String, String,
= (i, s) -> s.map(entry -> entry.getValue());
}


@Test
public void openingCorfuTableTwice() {
CorfuTable<String, String, StringIndexers, String>
instance1 = getDefaultRuntime().getObjectsView().build()
.setTypeToken(
new TypeToken<CorfuTable<String, String, StringIndexers, String>>() {})
.setArguments(StringIndexers.class)
.setStreamName("test")
.open();

assertThat(instance1.hasSecondaryIndices()).isTrue();

CorfuTable<String, String, StringIndexers, String>
instance2 = getDefaultRuntime().getObjectsView().build()
.setTypeToken(
new TypeToken<CorfuTable<String, String, StringIndexers, String>>() {})
.setStreamName("test")
.open();

// Verify that the first the indexer is set on the first open
// TODO(Maithem): This might seem like weird semantics, but we
// address it once we tackle the lifecycle of SMRObjects.
assertThat(instance2.getIndexerClass()).isEqualTo(instance1.getIndexerClass());
}

@Test
@SuppressWarnings("unchecked")
public void canReadFromEachIndex() {
Expand Down Expand Up @@ -106,72 +132,6 @@ public void canReadWithoutIndexes() {
MapEntry.entry("k3", "b"));
}

/**
* Create a CorfuTable without index and add an indexer
* post-creation (CorfuTable already have entries).
*/
@Test
public void canSetIndexIfNoneSoFar() {
CorfuTable<String, String, CorfuTable.NoSecondaryIndex, Void>
corfuTable = getDefaultRuntime().getObjectsView().build()
.setType(CorfuTable.class)
.setStreamName("test")
.open();

corfuTable.put("k1", "a");
corfuTable.put("k2", "ab");
corfuTable.put("k3", "b");

CorfuTable<String, String, StringIndexers, String>
corfuTableWithIndex = getDefaultRuntime().getObjectsView().build()
.setType(CorfuTable.class)
.setArguments(StringIndexers.class)
.setStreamName("test")
.open();

assertThat(corfuTableWithIndex.get("k1")).isEqualTo("a");
assertThat(corfuTableWithIndex.get("k2")).isEqualTo("ab");
assertThat(corfuTableWithIndex.get("k3")).isEqualTo("b");

assertThat(corfuTableWithIndex.getByIndex(StringIndexers.BY_FIRST_LETTER, "a"))
.containsExactly("a", "ab");

assertThat(corfuTableWithIndex.getByIndex(StringIndexers.BY_VALUE, "ab"))
.containsExactly("ab");

}

/**
* Replace the existing indexer with another one.
*/
@Test
public void canSetNewIndex() {
CorfuTable<String, String, StringIndexers, String>
corfuTable = getDefaultRuntime().getObjectsView().build()
.setTypeToken(CorfuTable.<String, String, StringIndexers, String>getTableType())
.setArguments(StringIndexers.class)
.setStreamName("test")
.open();

corfuTable.put("k1", "a");
corfuTable.put("k2", "ab");
corfuTable.put("k3", "b");

assertThat(corfuTable.getByIndex(StringIndexers.BY_FIRST_LETTER, "a"))
.containsExactly("a", "ab");


CorfuTable<String, String, OtherStringIndexer, String>
corfuTableWithIndex = getDefaultRuntime().getObjectsView().build()
.setTypeToken(CorfuTable.<String, String, OtherStringIndexer, String>getTableType())
.setArguments(OtherStringIndexer.class)
.setStreamName("test")
.open();

assertThat(corfuTableWithIndex.getByIndex(OtherStringIndexer.BY_LAST_LETTER, "b"))
.containsExactly("ab", "b");
}

/**
* Remove an entry also update indices
*/
Expand Down

0 comments on commit f0a6a4e

Please sign in to comment.