Skip to content

Commit

Permalink
[FLINK-4375] [distributed coordination] Implement new JobManager crea…
Browse files Browse the repository at this point in the history
…tion, initialization, and basic RPC methods
  • Loading branch information
KurtYoung authored and StephanEwen committed Dec 23, 2016
1 parent 4e5f423 commit c648606
Show file tree
Hide file tree
Showing 29 changed files with 1,164 additions and 940 deletions.
14 changes: 14 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/StringUtils.java
Expand Up @@ -335,4 +335,18 @@ public static String readNullableString(DataInputView in) throws IOException {
return null; return null;
} }
} }

public static boolean isNullOrWhitespaceOnly(String str) {
if (str == null || str.length() == 0) {
return true;
}

final int len = str.length();
for (int i = 0; i < len; i++) {
if (!Character.isWhitespace(str.charAt(i))) {
return false;
}
}
return true;
}
} }
Expand Up @@ -22,7 +22,11 @@
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.NetUtils; import org.apache.flink.util.NetUtils;
Expand All @@ -45,6 +49,7 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;


/** /**
* This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
Expand Down Expand Up @@ -96,9 +101,16 @@ public class BlobServer extends Thread implements BlobService {
* thrown if the BLOB server cannot bind to a free network port * thrown if the BLOB server cannot bind to a free network port
*/ */
public BlobServer(Configuration config) throws IOException { public BlobServer(Configuration config) throws IOException {
checkNotNull(config, "Configuration"); this(config, createBlobStoreFromConfig(config));
}


HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException {
this(config, haServices.createBlobStore());
}

private BlobServer(Configuration config, BlobStore blobStore) throws IOException {
checkNotNull(config);
this.blobStore = checkNotNull(blobStore);


this.blobServiceConfiguration = config; this.blobServiceConfiguration = config;


Expand All @@ -107,14 +119,6 @@ public BlobServer(Configuration config) throws IOException {
this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", storageDir); LOG.info("Created BLOB server storage directory {}", storageDir);


if (highAvailabilityMode == HighAvailabilityMode.NONE) {
this.blobStore = new VoidBlobStore();
} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
this.blobStore = new FileSystemBlobStore(config);
} else {
throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
}

// configure the maximum number of concurrent connections // configure the maximum number of concurrent connections
final int maxConnections = config.getInteger( final int maxConnections = config.getInteger(
ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
Expand All @@ -135,13 +139,7 @@ public BlobServer(Configuration config) throws IOException {
backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
} }


if (highAvailabilityMode == HighAvailabilityMode.NONE) { this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
// Add shutdown hook to delete storage directory
this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
else {
this.shutdownHook = null;
}


if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) { ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
Expand Down Expand Up @@ -451,4 +449,37 @@ List<BlobServerConnection> getCurrentActiveConnections() {
} }
} }


private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);

if (highAvailabilityMode == HighAvailabilityMode.NONE) {
return new VoidBlobStore();
} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
if (isNullOrWhitespaceOnly(storagePath)) {
throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
HighAvailabilityOptions.HA_STORAGE_PATH);
}

final Path path;
try {
path = new Path(storagePath);
} catch (Exception e) {
throw new IOException("Invalid path for highly available storage (" +
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
}

final FileSystem fileSystem;
try {
fileSystem = path.getFileSystem();
} catch (Exception e) {
throw new IOException("Could not create FileSystem for highly available storage (" +
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
}

return new FileSystemBlobStore(fileSystem, storagePath);
} else {
throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
}
}
} }
Expand Up @@ -25,7 +25,7 @@
/** /**
* A blob store. * A blob store.
*/ */
interface BlobStore { public interface BlobStore {


/** /**
* Copies the local file to the blob store. * Copies the local file to the blob store.
Expand Down Expand Up @@ -93,5 +93,4 @@ interface BlobStore {
* Cleans up the store and deletes all blobs. * Cleans up the store and deletes all blobs.
*/ */
void cleanUp(); void cleanUp();

} }
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.util.IOUtils; import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;


import java.io.EOFException; import java.io.EOFException;
Expand Down Expand Up @@ -73,18 +74,17 @@ public class BlobUtils {
*/ */
static File initStorageDirectory(String storageDirectory) { static File initStorageDirectory(String storageDirectory) {
File baseDir; File baseDir;
if (storageDirectory == null || storageDirectory.trim().isEmpty()) { if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
baseDir = new File(System.getProperty("java.io.tmpdir")); baseDir = new File(System.getProperty("java.io.tmpdir"));
} }
else { else {
baseDir = new File(storageDirectory); baseDir = new File(storageDirectory);
} }


File storageDir; File storageDir;
final int MAX_ATTEMPTS = 10;
int attempt;


for(attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { final int MAX_ATTEMPTS = 10;
for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
storageDir = new File(baseDir, String.format( storageDir = new File(baseDir, String.format(
"blobStore-%s", UUID.randomUUID().toString())); "blobStore-%s", UUID.randomUUID().toString()));


Expand Down
Expand Up @@ -20,12 +20,7 @@


import com.google.common.io.Files; import com.google.common.io.Files;


import org.apache.commons.lang3.StringUtils;

import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.util.IOUtils; import org.apache.flink.util.IOUtils;
Expand All @@ -38,7 +33,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI;


import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;


Expand All @@ -47,25 +41,24 @@
* *
* <p>This is used in addition to the local blob storage for high availability. * <p>This is used in addition to the local blob storage for high availability.
*/ */
class FileSystemBlobStore implements BlobStore { public class FileSystemBlobStore implements BlobStore {


private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class); private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);


/** The file system in which blobs are stored */
private final FileSystem fileSystem;

/** The base path of the blob store */ /** The base path of the blob store */
private final String basePath; private final String basePath;


FileSystemBlobStore(Configuration config) throws IOException { public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException {
String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); this.fileSystem = checkNotNull(fileSystem);

this.basePath = checkNotNull(storagePath) + "/blob";
if (storagePath == null || StringUtils.isBlank(storagePath)) {
throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
" Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
}


this.basePath = storagePath + "/blob"; LOG.info("Creating highly available BLOB storage directory at {}", basePath);


FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath)); fileSystem.mkdirs(new Path(basePath));
LOG.info("Created blob directory {}.", basePath); LOG.debug("Created highly available BLOB storage directory at {}", basePath);
} }


// - Put ------------------------------------------------------------------ // - Put ------------------------------------------------------------------
Expand All @@ -81,9 +74,7 @@ public void put(File localFile, JobID jobId, String key) throws Exception {
} }


private void put(File fromFile, String toBlobPath) throws Exception { private void put(File fromFile, String toBlobPath) throws Exception {
try (OutputStream os = FileSystem.get(new URI(toBlobPath)) try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) {
.create(new Path(toBlobPath), true)) {

LOG.debug("Copying from {} to {}.", fromFile, toBlobPath); LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
Files.copy(fromFile, os); Files.copy(fromFile, os);
} }
Expand All @@ -106,16 +97,15 @@ private void get(String fromBlobPath, File toFile) throws Exception {
checkNotNull(toFile, "File"); checkNotNull(toFile, "File");


if (!toFile.exists() && !toFile.createNewFile()) { if (!toFile.exists() && !toFile.createNewFile()) {
throw new IllegalStateException("Failed to create target file to copy to"); throw new IOException("Failed to create target file to copy to");
} }


final URI fromUri = new URI(fromBlobPath);
final Path fromPath = new Path(fromBlobPath); final Path fromPath = new Path(fromBlobPath);


if (FileSystem.get(fromUri).exists(fromPath)) { if (fileSystem.exists(fromPath)) {
try (InputStream is = FileSystem.get(fromUri).open(fromPath)) { try (InputStream is = fileSystem.open(fromPath);
FileOutputStream fos = new FileOutputStream(toFile); FileOutputStream fos = new FileOutputStream(toFile))

{
LOG.debug("Copying from {} to {}.", fromBlobPath, toFile); LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
IOUtils.copyBytes(is, fos); // closes the streams IOUtils.copyBytes(is, fos); // closes the streams
} }
Expand Down Expand Up @@ -145,17 +135,16 @@ public void deleteAll(JobID jobId) {
private void delete(String blobPath) { private void delete(String blobPath) {
try { try {
LOG.debug("Deleting {}.", blobPath); LOG.debug("Deleting {}.", blobPath);


FileSystem fs = FileSystem.get(new URI(blobPath));
Path path = new Path(blobPath); Path path = new Path(blobPath);


fs.delete(path, true); fileSystem.delete(path, true);


// send a call to delete the directory containing the file. This will // send a call to delete the directory containing the file. This will
// fail (and be ignored) when some files still exist. // fail (and be ignored) when some files still exist.
try { try {
fs.delete(path.getParent(), false); fileSystem.delete(path.getParent(), false);
fs.delete(new Path(basePath), false); fileSystem.delete(new Path(basePath), false);
} catch (IOException ignored) {} } catch (IOException ignored) {}
} }
catch (Exception e) { catch (Exception e) {
Expand All @@ -168,7 +157,7 @@ public void cleanUp() {
try { try {
LOG.debug("Cleaning up {}.", basePath); LOG.debug("Cleaning up {}.", basePath);


FileSystem.get(new URI(basePath)).delete(new Path(basePath), true); fileSystem.delete(new Path(basePath), true);
} }
catch (Exception e) { catch (Exception e) {
LOG.error("Failed to clean up recovery directory."); LOG.error("Failed to clean up recovery directory.");
Expand Down
Expand Up @@ -25,7 +25,7 @@
/** /**
* A blob store doing nothing. * A blob store doing nothing.
*/ */
class VoidBlobStore implements BlobStore { public class VoidBlobStore implements BlobStore {


@Override @Override
public void put(File localFile, BlobKey blobKey) throws Exception { public void put(File localFile, BlobKey blobKey) throws Exception {
Expand Down
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.runtime.highavailability; package org.apache.flink.runtime.highavailability;


import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;


import java.io.IOException;

/** /**
* This class gives access to all services needed for * This class gives access to all services needed for
* *
Expand Down Expand Up @@ -72,4 +75,14 @@ public interface HighAvailabilityServices {
* Gets the submitted job graph store for the job manager * Gets the submitted job graph store for the job manager
*/ */
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

/**
* Gets the registry that holds information about whether jobs are currently running.
*/
RunningJobsRegistry getRunningJobsRegistry() throws Exception;

/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*/
BlobStore createBlobStore() throws IOException;
} }
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.runtime.highavailability; package org.apache.flink.runtime.highavailability;


import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.LeaderElectionService;
Expand Down Expand Up @@ -102,4 +105,14 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
return new StandaloneSubmittedJobGraphStore(); return new StandaloneSubmittedJobGraphStore();
} }

@Override
public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
return new NonHaRegistry();
}

@Override
public BlobStore createBlobStore() {
return new VoidBlobStore();
}
} }

0 comments on commit c648606

Please sign in to comment.