Skip to content

Commit

Permalink
[FLINK-12868][mesos] Fix mesos cluster can not be deployed if plugins…
Browse files Browse the repository at this point in the history
… dir does not exist

Plugins dir should be optional and cluster deployment should not fail if it doesn't exist.
  • Loading branch information
pnowojski committed Jun 21, 2019
1 parent d17d4d8 commit 4ac150c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
Expand Up @@ -75,7 +75,12 @@ public void configure(ContainerSpecification container) throws IOException {
addPathRecursively(flinkBinPath, TARGET_ROOT, container);
addPathRecursively(flinkConfPath, TARGET_ROOT, container);
addPathRecursively(flinkLibPath, TARGET_ROOT, container);
addPathRecursively(flinkPluginsPath, TARGET_ROOT, container);
if (flinkPluginsPath.isDirectory()) {
addPathRecursively(flinkPluginsPath, TARGET_ROOT, container);
}
else {
LOG.warn("The plugins directory '" + flinkPluginsPath + "' doesn't exist.");
}
}

public static Builder newBuilder() {
Expand Down
Expand Up @@ -22,23 +22,24 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
import static org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay.TARGET_ROOT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase {

Expand All @@ -47,7 +48,6 @@ public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase {

@Test
public void testConfigure() throws Exception {

File binFolder = tempFolder.newFolder("bin");
File libFolder = tempFolder.newFolder("lib");
File pluginsFolder = tempFolder.newFolder("plugins");
Expand All @@ -65,6 +65,34 @@ public void testConfigure() throws Exception {
"plugins/P1/plugin1b.jar",
"plugins/P2/plugin2.jar");

testConfigure(binFolder, libFolder, pluginsFolder, confFolder, files);
}

@Test
public void testConfigureWithMissingPlugins() throws Exception {
File binFolder = tempFolder.newFolder("bin");
File libFolder = tempFolder.newFolder("lib");
File pluginsFolder = Paths.get(tempFolder.getRoot().getAbsolutePath(), "s0m3_p4th_th4t_sh0uld_n0t_3x1sts").toFile();
File confFolder = tempFolder.newFolder("conf");

Path[] files = createPaths(
tempFolder.getRoot(),
"bin/config.sh",
"bin/taskmanager.sh",
"lib/foo.jar",
"lib/A/foo.jar",
"lib/B/foo.jar",
"lib/B/bar.jar");

testConfigure(binFolder, libFolder, pluginsFolder, confFolder, files);
}

private void testConfigure(
File binFolder,
File libFolder,
File pluginsFolder,
File confFolder,
Path[] files) throws IOException {
ContainerSpecification containerSpecification = new ContainerSpecification();
FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
binFolder,
Expand Down

0 comments on commit 4ac150c

Please sign in to comment.