From f0a6a4e19b6db9fcae75b3a69f0c2f6e482ff0a6 Mon Sep 17 00:00:00 2001 From: Maithem Date: Thu, 14 Dec 2017 14:34:15 -0800 Subject: [PATCH] Object Open Race When an object is opened, it is added to the cache right after its creation, as a consequence other threads can retrieve the object from the object cache after its creation and before additional initialization. For example, register index. This patch moves the extra initialization into an atomic operation (i.e. computeIfAbsent). As a consequence, the parameters of the first cached open will be observed by subsequent opens. --- .../runtime/collections/CorfuTable.java | 20 +--- .../corfudb/runtime/view/ObjectBuilder.java | 51 ++++------ .../runtime/collections/CorfuTableTest.java | 92 ++++++------------- 3 files changed, 46 insertions(+), 117 deletions(-) diff --git a/runtime/src/main/java/org/corfudb/runtime/collections/CorfuTable.java b/runtime/src/main/java/org/corfudb/runtime/collections/CorfuTable.java index 6071b41e887..12824ec4143 100644 --- a/runtime/src/main/java/org/corfudb/runtime/collections/CorfuTable.java +++ b/runtime/src/main/java/org/corfudb/runtime/collections/CorfuTable.java @@ -159,11 +159,12 @@ public CorfuTable(Class indexFunctionEnumClass) { indexerClass = indexFunctionEnumClass; indexFunctions.addAll(EnumSet.allOf(indexFunctionEnumClass)); indexFunctions.forEach(f -> indexMap.put(f, new HashMap<>())); + log.info("CorfuTable: creating CorfuTable with {} as indexer class", indexFunctionEnumClass); } /** Default constructor. Generates a table without any secondary indexes. */ public CorfuTable() { - log.debug("CorfuTable: Creating a table without secondary indexes! Secondary index lookup" + log.info("CorfuTable: Creating a table without secondary indexes! Secondary index lookup" + " will DEGRADE to a full scan"); } @@ -286,23 +287,6 @@ public V get(@ConflictParameter Object key) { .collect(Collectors.toCollection(ArrayList::new)); } - - /** - * Register new index class - * - * This replaces the current index. - * - * @param indexFunctionEnumClass - */ - public void registerIndex(Class indexFunctionEnumClass) { - indexerClass = indexFunctionEnumClass; - indexMap.clear(); - indexFunctions.clear(); - indexFunctions.addAll(EnumSet.allOf(indexFunctionEnumClass)); - indexFunctions.forEach(f -> indexMap.put(f, new HashMap<>())); - mainMap.forEach(this::mapSecondaryIndexes); - } - /** {@inheritDoc} */ @Override @MutatorAccessor(name = "put", undoFunction = "undoPut", undoRecordFunction = "undoPutRecord") diff --git a/runtime/src/main/java/org/corfudb/runtime/view/ObjectBuilder.java b/runtime/src/main/java/org/corfudb/runtime/view/ObjectBuilder.java index 34362cd42a0..43f92e72cd8 100644 --- a/runtime/src/main/java/org/corfudb/runtime/view/ObjectBuilder.java +++ b/runtime/src/main/java/org/corfudb/runtime/view/ObjectBuilder.java @@ -60,7 +60,7 @@ public ObjectBuilder setType(Class type) { @SuppressWarnings("unchecked") public ObjectBuilder setTypeToken(TypeToken typeToken) { - this.type = (Class)typeToken.getRawType(); + this.type = (Class) typeToken.getRawType(); return (ObjectBuilder) this; } @@ -103,45 +103,30 @@ public T open() { arguments, serializer); } else { ObjectsView.ObjectID oid = new ObjectsView.ObjectID(streamID, type); - T result = (T) runtime.getObjectsView().objectCache.computeIfAbsent(oid, x -> { + return (T) runtime.getObjectsView().objectCache.computeIfAbsent(oid, x -> { try { - return CorfuCompileWrapperBuilder.getWrapper(type, runtime, + T result = CorfuCompileWrapperBuilder.getWrapper(type, runtime, streamID, arguments, serializer); + + // Get object serializer to check if we didn't attempt to set another serializer + // to an already existing map + ISerializer objectSerializer = ((CorfuCompileProxy) ((ICorfuSMR) result). + getCorfuSMRProxy()) + .getSerializer(); + + if (serializer != objectSerializer) { + log.warn("open: Attempt to open an existing object with a different serializer {}. " + + "Object {} opened with original serializer {}.", + serializer.getClass().getSimpleName(), + oid, + objectSerializer.getClass().getSimpleName()); + } + return result; } catch (Exception ex) { throw new RuntimeException(ex); } } ); - // Get object serializer to check if we didn't attempt to set another serializer - // to an already existing map - ISerializer objectSerializer = ((CorfuCompileProxy) ((ICorfuSMR) runtime.getObjectsView(). - getObjectCache(). - get(oid)). - getCorfuSMRProxy()) - .getSerializer(); - - // FIXME: temporary hack until we have a registry - // If current map in cache has no indexer, or there is currently an other one, - // this will create and compute the indices. - if (result instanceof CorfuTable) { - CorfuTable currentCorfuTable = ((CorfuTable) result); - if (arguments.length > 0) { - // If current map in cache has no indexer, or there is currently an other index - if (!(currentCorfuTable.hasSecondaryIndices()) || - currentCorfuTable.getIndexerClass() != arguments[0].getClass()){ - ((CorfuTable) result).registerIndex((Class) arguments[0]); - } - } - } - - if (serializer != objectSerializer) { - log.warn("open: Attempt to open an existing object with a different serializer {}. " + - "Object {} opened with original serializer {}.", - serializer.getClass().getSimpleName(), - oid, - objectSerializer.getClass().getSimpleName()); - } - return result; } } catch (Exception ex) { log.error("Runtime instrumentation no longer supported and no compiled class found" diff --git a/test/src/test/java/org/corfudb/runtime/collections/CorfuTableTest.java b/test/src/test/java/org/corfudb/runtime/collections/CorfuTableTest.java index ec1cc2eb4ef..3d0b1103567 100644 --- a/test/src/test/java/org/corfudb/runtime/collections/CorfuTableTest.java +++ b/test/src/test/java/org/corfudb/runtime/collections/CorfuTableTest.java @@ -41,6 +41,32 @@ enum OtherStringIndexer implements CorfuTable.IndexSpecification s.map(entry -> entry.getValue()); } + + @Test + public void openingCorfuTableTwice() { + CorfuTable + instance1 = getDefaultRuntime().getObjectsView().build() + .setTypeToken( + new TypeToken>() {}) + .setArguments(StringIndexers.class) + .setStreamName("test") + .open(); + + assertThat(instance1.hasSecondaryIndices()).isTrue(); + + CorfuTable + instance2 = getDefaultRuntime().getObjectsView().build() + .setTypeToken( + new TypeToken>() {}) + .setStreamName("test") + .open(); + + // Verify that the first the indexer is set on the first open + // TODO(Maithem): This might seem like weird semantics, but we + // address it once we tackle the lifecycle of SMRObjects. + assertThat(instance2.getIndexerClass()).isEqualTo(instance1.getIndexerClass()); + } + @Test @SuppressWarnings("unchecked") public void canReadFromEachIndex() { @@ -106,72 +132,6 @@ public void canReadWithoutIndexes() { MapEntry.entry("k3", "b")); } - /** - * Create a CorfuTable without index and add an indexer - * post-creation (CorfuTable already have entries). - */ - @Test - public void canSetIndexIfNoneSoFar() { - CorfuTable - corfuTable = getDefaultRuntime().getObjectsView().build() - .setType(CorfuTable.class) - .setStreamName("test") - .open(); - - corfuTable.put("k1", "a"); - corfuTable.put("k2", "ab"); - corfuTable.put("k3", "b"); - - CorfuTable - corfuTableWithIndex = getDefaultRuntime().getObjectsView().build() - .setType(CorfuTable.class) - .setArguments(StringIndexers.class) - .setStreamName("test") - .open(); - - assertThat(corfuTableWithIndex.get("k1")).isEqualTo("a"); - assertThat(corfuTableWithIndex.get("k2")).isEqualTo("ab"); - assertThat(corfuTableWithIndex.get("k3")).isEqualTo("b"); - - assertThat(corfuTableWithIndex.getByIndex(StringIndexers.BY_FIRST_LETTER, "a")) - .containsExactly("a", "ab"); - - assertThat(corfuTableWithIndex.getByIndex(StringIndexers.BY_VALUE, "ab")) - .containsExactly("ab"); - - } - - /** - * Replace the existing indexer with another one. - */ - @Test - public void canSetNewIndex() { - CorfuTable - corfuTable = getDefaultRuntime().getObjectsView().build() - .setTypeToken(CorfuTable.getTableType()) - .setArguments(StringIndexers.class) - .setStreamName("test") - .open(); - - corfuTable.put("k1", "a"); - corfuTable.put("k2", "ab"); - corfuTable.put("k3", "b"); - - assertThat(corfuTable.getByIndex(StringIndexers.BY_FIRST_LETTER, "a")) - .containsExactly("a", "ab"); - - - CorfuTable - corfuTableWithIndex = getDefaultRuntime().getObjectsView().build() - .setTypeToken(CorfuTable.getTableType()) - .setArguments(OtherStringIndexer.class) - .setStreamName("test") - .open(); - - assertThat(corfuTableWithIndex.getByIndex(OtherStringIndexer.BY_LAST_LETTER, "b")) - .containsExactly("ab", "b"); - } - /** * Remove an entry also update indices */