Skip to content

Commit

Permalink
[FLINK-22369][rocksdb] RocksDB state backend might occur ClassNotFoun…
Browse files Browse the repository at this point in the history
…dException when deserializing on TM side
  • Loading branch information
sjwiesman authored and dawidwys committed Apr 21, 2021
1 parent a93b09d commit 60e17f4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
Expand Down Expand Up @@ -112,8 +111,6 @@ public enum PriorityQueueStateType {

private static final long UNDEFINED_WRITE_BATCH_SIZE = -1;

private Logger logger = LOG;

// ------------------------------------------------------------------------

// -- configuration values, set in the application / configuration
Expand Down Expand Up @@ -260,7 +257,7 @@ private EmbeddedRocksDBStateBackend(
original.predefinedOptions == null
? PredefinedOptions.valueOf(config.get(RocksDBOptions.PREDEFINED_OPTIONS))
: original.predefinedOptions;
logger.info("Using predefined options: {}.", predefinedOptions.name());
LOG.info("Using predefined options: {}.", predefinedOptions.name());

// configure RocksDB options factory
try {
Expand All @@ -278,16 +275,6 @@ private EmbeddedRocksDBStateBackend(
latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config);
}

/**
* Overrides the default logger for this class. It ensures users of the legacy {@link
* RocksDBStateBackend} see consistent logging.
*/
@Internal
@SuppressWarnings("SameParameterValue")
void setLogger(Logger logger) {
this.logger = logger;
}

// ------------------------------------------------------------------------
// Reconfiguration
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -334,7 +321,7 @@ private void lazyInitializeForJob(
"Local DB files directory '"
+ f
+ "' does not exist and cannot be created. ";
logger.error(msg);
LOG.error(msg);
errorMessage.append(msg);
} else {
dirs.add(f);
Expand Down Expand Up @@ -415,7 +402,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
// first, make sure that the RocksDB JNI library is loaded
// we do this explicitly here to have better error handling
String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir, logger);
ensureRocksDBIsLoaded(tempDir);

// replace all characters that are not legal for filenames with underscore
String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
Expand All @@ -437,10 +424,9 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(

final OpaqueMemoryResource<RocksDBSharedResources> sharedResources =
RocksDBOperationUtils.allocateSharedCachesIfConfigured(
memoryConfiguration, env.getMemoryManager(), managedMemoryFraction, logger);
memoryConfiguration, env.getMemoryManager(), managedMemoryFraction, LOG);
if (sharedResources != null) {
logger.info(
"Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());
LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());
}
final RocksDBResourceContainer resourceContainer =
createOptionsAndResourceContainer(sharedResources);
Expand Down Expand Up @@ -512,7 +498,7 @@ private RocksDBOptionsFactory configureOptionsFactory(
((ConfigurableRocksDBOptionsFactory) originalOptionsFactory)
.configure(config);
}
logger.info("Using application-defined options factory: {}.", originalOptionsFactory);
LOG.info("Using application-defined options factory: {}.", originalOptionsFactory);

return originalOptionsFactory;
}
Expand All @@ -523,7 +509,7 @@ private RocksDBOptionsFactory configureOptionsFactory(
DefaultConfigurableOptionsFactory optionsFactory =
new DefaultConfigurableOptionsFactory();
optionsFactory.configure(config);
logger.info("Using default options factory: {}.", optionsFactory);
LOG.info("Using default options factory: {}.", optionsFactory);

return optionsFactory;
} else {
Expand All @@ -537,7 +523,7 @@ private RocksDBOptionsFactory configureOptionsFactory(
optionsFactory =
((ConfigurableRocksDBOptionsFactory) optionsFactory).configure(config);
}
logger.info("Using configured options factory: {}.", optionsFactory);
LOG.info("Using configured options factory: {}.", optionsFactory);

return optionsFactory;
} catch (ClassNotFoundException e) {
Expand Down Expand Up @@ -834,12 +820,12 @@ public String toString() {
// ------------------------------------------------------------------------

@VisibleForTesting
static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) throws IOException {
static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
synchronized (EmbeddedRocksDBStateBackend.class) {
if (!rocksDbInitialized) {

final File tempDirParent = new File(tempDirectory).getAbsoluteFile();
logger.info(
LOG.info(
"Attempting to load RocksDB native library and store it under '{}'",
tempDirParent);

Expand All @@ -863,7 +849,7 @@ static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) throws IO
rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());

// make sure the temp path exists
logger.debug(
LOG.debug(
"Attempting to create RocksDB native library folder {}",
rocksLibFolder);
// noinspection ResultOfMethodCallIgnored
Expand All @@ -877,18 +863,18 @@ static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) throws IO
RocksDB.loadLibrary();

// seems to have worked
logger.info("Successfully loaded RocksDB native library");
LOG.info("Successfully loaded RocksDB native library");
rocksDbInitialized = true;
return;
} catch (Throwable t) {
lastException = t;
logger.debug("RocksDB JNI library loading attempt {} failed", attempt, t);
LOG.debug("RocksDB JNI library loading attempt {} failed", attempt, t);

// try to force RocksDB to attempt reloading the library
try {
resetRocksDBLoadedFlag();
} catch (Throwable tt) {
logger.debug(
LOG.debug(
"Failed to reset 'initialized' flag in RocksDB native code loader",
tt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ public RocksDBStateBackend(
}
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.rocksDBStateBackend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
this.rocksDBStateBackend.setLogger(LOG);
}

/** @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead. */
Expand Down Expand Up @@ -588,7 +587,7 @@ public String toString() {

@VisibleForTesting
static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory, LOG);
EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testTempLibFolderDeletedOnFail() throws Exception {

File tempFolder = temporaryFolder.newFolder();
try {
EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath(), LOG);
EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath());
fail("Not throwing expected exception.");
} catch (IOException ignored) {
// ignored
Expand Down

0 comments on commit 60e17f4

Please sign in to comment.