Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.blob.BlobRecoveryITCase;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -234,7 +236,17 @@ public void testBlobServerRecovery() throws Exception {
config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobRecoveryITCase.testBlobServerRecovery(config);
BlobStoreService blobStoreService = null;

try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}

// package visible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.mesos.runtime.clusterframework

import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
Expand All @@ -38,7 +38,7 @@ class MesosTaskManager(
ioManager: IOManager,
network: NetworkEnvironment,
numberOfSlots: Int,
leaderRetrievalService: LeaderRetrievalService,
highAvailabilityServices: HighAvailabilityServices,
metricRegistry : MetricRegistry)
extends TaskManager(
config,
Expand All @@ -48,7 +48,7 @@ class MesosTaskManager(
ioManager,
network,
numberOfSlots,
leaderRetrievalService,
highAvailabilityServices,
metricRegistry) {

override def handleMessage: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
Expand Down Expand Up @@ -148,6 +149,7 @@ public class WebRuntimeMonitor implements WebMonitor {
public WebRuntimeMonitor(
Configuration config,
LeaderRetrievalService leaderRetrievalService,
BlobView blobView,
ActorSystem actorSystem) throws IOException, InterruptedException {

this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
Expand Down Expand Up @@ -269,10 +271,26 @@ public WebRuntimeMonitor(
GET(router, new JobMetricsHandler(metricFetcher));

GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
TaskManagerLogHandler.FileMode.LOG, config, enableSSL));
GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL));
GET(router,
new TaskManagerLogHandler(
retriever,
context,
jobManagerAddressPromise.future(),
timeout,
TaskManagerLogHandler.FileMode.LOG,
config,
enableSSL,
blobView));
GET(router,
new TaskManagerLogHandler(
retriever,
context,
jobManagerAddressPromise.future(),
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
enableSSL,
blobView));
GET(router, new TaskManagerMetricsHandler(metricFetcher));

router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
Expand All @@ -62,6 +63,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {

private final Time timeTimeout;

private final BlobView blobView;

public enum FileMode {
LOG,
STDOUT
Expand All @@ -128,7 +132,8 @@ public TaskManagerLogHandler(
FiniteDuration timeout,
FileMode fileMode,
Configuration config,
boolean httpsEnabled) {
boolean httpsEnabled,
BlobView blobView) {
super(retriever, localJobManagerAddressPromise, timeout, httpsEnabled);

this.executor = checkNotNull(executor);
Expand All @@ -142,6 +147,8 @@ public TaskManagerLogHandler(
break;
}

this.blobView = Preconditions.checkNotNull(blobView, "blobView");

timeTimeout = Time.milliseconds(timeout.toMillis());
}

Expand All @@ -167,7 +174,7 @@ public BlobCache checkedApply(Object result) throws IOException {
Option<String> hostOption = jobManager.actor().path().address().host();
String host = hostOption.isDefined() ? hostOption.get() : "localhost";
int port = (int) result;
return new BlobCache(new InetSocketAddress(host, port), config);
return new BlobCache(new InetSocketAddress(host, port), config, blobView);
}
}, executor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
Expand Down Expand Up @@ -154,6 +155,7 @@ public void testRedirectToLeader() throws Exception {
webMonitor[i] = new WebRuntimeMonitor(
config,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
jobManagerSystem[i]);
}

Expand Down Expand Up @@ -294,9 +296,11 @@ public void testLeaderNotAvailable() throws Exception {

actorSystem = AkkaUtils.createDefaultActorSystem();

LeaderRetrievalService leaderRetrievalService = mock(LeaderRetrievalService.class);
webRuntimeMonitor = new WebRuntimeMonitor(
config, leaderRetrievalService, actorSystem);
config,
mock(LeaderRetrievalService.class),
mock(BlobView.class),
actorSystem);

webRuntimeMonitor.start("akka://schmakka");

Expand Down Expand Up @@ -467,10 +471,12 @@ private WebRuntimeMonitor startWebRuntimeMonitor(
config.setInteger(JobManagerOptions.WEB_PORT, 0);
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());

HighAvailabilityServices highAvailabilityServices = flink.highAvailabilityServices();

WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
config,
flink.highAvailabilityServices().getJobManagerLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
jmActorSystem);

webMonitor.start(jobManagerAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Executors;
Expand All @@ -53,7 +54,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Matchers.any;
Expand All @@ -71,7 +71,8 @@ public void testGetPaths() {
AkkaUtils.getDefaultClientTimeout(),
TaskManagerLogHandler.FileMode.LOG,
new Configuration(),
false);
false,
new VoidBlobStore());
String[] pathsLog = handlerLog.getPaths();
Assert.assertEquals(1, pathsLog.length);
Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]);
Expand All @@ -83,7 +84,8 @@ public void testGetPaths() {
AkkaUtils.getDefaultClientTimeout(),
TaskManagerLogHandler.FileMode.STDOUT,
new Configuration(),
false);
false,
new VoidBlobStore());
String[] pathsOut = handlerOut.getPaths();
Assert.assertEquals(1, pathsOut.length);
Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]);
Expand Down Expand Up @@ -131,7 +133,8 @@ public void testLogFetchingFailure() throws Exception {
AkkaUtils.getDefaultClientTimeout(),
TaskManagerLogHandler.FileMode.LOG,
new Configuration(),
false);
false,
new VoidBlobStore());

final AtomicReference<String> exception = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,7 +57,7 @@ public final class BlobCache implements BlobService {
private final File storageDir;

/** Blob store for distributed file storage, e.g. in HA */
private final BlobStore blobStore;
private final BlobView blobView;

private final AtomicBoolean shutdownRequested = new AtomicBoolean();

Expand All @@ -78,55 +77,19 @@ public final class BlobCache implements BlobService {
* address of the {@link BlobServer} to use for fetching files from
* @param blobClientConfig
* global configuration
*
* @throws IOException
* thrown if the (local or distributed) file storage cannot be created or
* is not usable
*/
public BlobCache(InetSocketAddress serverAddress,
Configuration blobClientConfig) throws IOException {
this(serverAddress, blobClientConfig,
BlobUtils.createBlobStoreFromConfig(blobClientConfig));
}

/**
* Instantiates a new BLOB cache.
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from
* @param blobClientConfig
* global configuration
* @param haServices
* high availability services able to create a distributed blob store
*
* @throws IOException
* thrown if the (local or distributed) file storage cannot be created or
* is not usable
*/
public BlobCache(InetSocketAddress serverAddress,
Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException {
this(serverAddress, blobClientConfig, haServices.createBlobStore());
}

/**
* Instantiates a new BLOB cache.
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from
* @param blobClientConfig
* global configuration
* @param blobStore
* @param blobView
* (distributed) blob store file system to retrieve files from first
*
* @throws IOException
* thrown if the (local or distributed) file storage cannot be created or is not usable
*/
private BlobCache(
final InetSocketAddress serverAddress, final Configuration blobClientConfig,
final BlobStore blobStore) throws IOException {
public BlobCache(
final InetSocketAddress serverAddress,
final Configuration blobClientConfig,
final BlobView blobView) throws IOException {
this.serverAddress = checkNotNull(serverAddress);
this.blobClientConfig = checkNotNull(blobClientConfig);
this.blobStore = blobStore;
this.blobView = checkNotNull(blobView, "blobStore");

// configure and create the storage directory
String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
Expand Down Expand Up @@ -168,7 +131,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException {

// first try the distributed blob store (if available)
try {
blobStore.get(requiredBlob, localJarFile);
blobView.get(requiredBlob, localJarFile);
} catch (Exception e) {
LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
}
Expand Down Expand Up @@ -293,28 +256,23 @@ public int getPort() {
}

@Override
public void shutdown() {
public void close() throws IOException {
if (shutdownRequested.compareAndSet(false, true)) {
LOG.info("Shutting down BlobCache");

// Clean up the storage directory
try {
FileUtils.deleteDirectory(storageDir);
}
catch (IOException e) {
LOG.error("BLOB cache failed to properly clean up its storage directory.");
}

// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
}
catch (Throwable t) {
LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
} finally {
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
} catch (Throwable t) {
LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
}
}
}
}
Expand Down
Loading