Skip to content

Commit

Permalink
[hotfix] Fix field names in RocksDBStateBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Feb 12, 2016
1 parent f5d7190 commit 8b7caaa
Showing 1 changed file with 16 additions and 16 deletions.
Expand Up @@ -83,10 +83,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
// DB storage directories

/** Base paths for RocksDB directory, as configured. May be null. */
private Path[] dbBasePaths;
private Path[] configuredDbBasePaths;

/** Base paths for RocksDB directory, as initialized */
private File[] dbStorageDirectories;
private File[] initializedDbBasePaths;

private int nextDirectory;

Expand Down Expand Up @@ -171,15 +171,15 @@ public void initializeForJob(
this.jobId = env.getJobID();

// initialize the paths where the local RocksDB files should be stored
if (dbBasePaths == null) {
if (configuredDbBasePaths == null) {
// initialize from the temp directories
dbStorageDirectories = env.getIOManager().getSpillingDirectories();
initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
}
else {
List<File> dirs = new ArrayList<>(dbBasePaths.length);
List<File> dirs = new ArrayList<>(configuredDbBasePaths.length);
String errorMessage = "";

for (Path path : dbBasePaths) {
for (Path path : configuredDbBasePaths) {
File f = new File(path.toUri().getPath());
if (!f.exists() && !f.mkdirs()) {
String msg = "Local DB files directory '" + f.getAbsolutePath()
Expand All @@ -193,11 +193,11 @@ public void initializeForJob(
if (dirs.isEmpty()) {
throw new Exception("No local storage directories available. " + errorMessage);
} else {
dbStorageDirectories = dirs.toArray(new File[dirs.size()]);
initializedDbBasePaths = dirs.toArray(new File[dirs.size()]);
}
}

nextDirectory = new Random().nextInt(dbStorageDirectories.length);
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
}

@Override
Expand Down Expand Up @@ -225,15 +225,15 @@ String getCheckpointPath(String stateName) {
}

File[] getStoragePaths() {
return dbStorageDirectories;
return initializedDbBasePaths;
}

File getNextStoragePath() {
int ni = nextDirectory + 1;
ni = ni >= dbStorageDirectories.length ? 0 : ni;
ni = ni >= initializedDbBasePaths.length ? 0 : ni;
nextDirectory = ni;

return dbStorageDirectories[ni];
return initializedDbBasePaths[ni];
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -330,7 +330,7 @@ public void setDbStoragePath(String path) {
*/
public void setDbStoragePaths(String... paths) {
if (paths == null) {
dbBasePaths = null;
configuredDbBasePaths = null;
}
else if (paths.length == 0) {
throw new IllegalArgumentException("empty paths");
Expand All @@ -350,7 +350,7 @@ else if (paths.length == 0) {
}
}

dbBasePaths = pp;
configuredDbBasePaths = pp;
}
}

Expand All @@ -359,12 +359,12 @@ else if (paths.length == 0) {
* @return The configured DB storage paths, or null, if none were configured.
*/
public String[] getDbStoragePaths() {
if (dbBasePaths == null) {
if (configuredDbBasePaths == null) {
return null;
} else {
String[] paths = new String[dbBasePaths.length];
String[] paths = new String[configuredDbBasePaths.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = dbBasePaths[i].toString();
paths[i] = configuredDbBasePaths[i].toString();
}
return paths;
}
Expand Down

0 comments on commit 8b7caaa

Please sign in to comment.