From c2e0e240970b09af68d0fc834f691a671e8b8fd2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 11 Jul 2018 17:30:53 +0200 Subject: [PATCH 1/9] [hotfix] Make PackagedProgram(Class, String...) constructor public --- .../java/org/apache/flink/client/program/PackagedProgram.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index c25d94304f8ae..bb3d294330e08 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -223,7 +223,7 @@ public PackagedProgram(File jarFile, List classpaths, @Nullable String entr } } - PackagedProgram(Class entryPointClass, String... args) throws ProgramInvocationException { + public PackagedProgram(Class entryPointClass, String... args) throws ProgramInvocationException { this.jarFile = null; this.args = args == null ? new String[0] : args; From ab9bd87e521d19db7c7d783268a3532d2e876a5d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 11 Jul 2018 17:41:27 +0200 Subject: [PATCH 2/9] [FLINK-9818] Add cluster component command line parser The cluster component command line parser is responsible for parsing the common command line arguments with which the cluster components are started. These include the configDir, webui-port and dynamic properties. This closes #6314. --- .../entrypoint/ClusterConfiguration.java | 30 +++++-- .../ClusterConfigurationParserFactory.java | 56 +++++++++++++ .../runtime/entrypoint/ClusterEntrypoint.java | 25 ++---- .../EntrypointClusterConfiguration.java | 40 ++++++++++ ...ointClusterConfigurationParserFactory.java | 62 +++++++++++++++ .../entrypoint/FlinkParseException.java | 42 ++++++++++ .../StandaloneSessionClusterEntrypoint.java | 14 +++- .../entrypoint/parser/CommandLineOptions.java | 52 ++++++++++++ .../entrypoint/parser/CommandLineParser.java | 62 +++++++++++++++ .../parser/ParserResultFactory.java | 48 +++++++++++ ...ClusterConfigurationParserFactoryTest.java | 77 ++++++++++++++++++ ...ClusterConfigurationParserFactoryTest.java | 79 +++++++++++++++++++ 12 files changed, 561 insertions(+), 26 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FlinkParseException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ParserResultFactory.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java index 7f8b5096d136c..ca69ed12bde10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java @@ -18,27 +18,43 @@ package org.apache.flink.runtime.entrypoint; -import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + +import java.util.Properties; /** * Configuration class which contains the parsed command line arguments for * the {@link ClusterEntrypoint}. */ public class ClusterConfiguration { + + @Nonnull private final String configDir; - private final int restPort; + @Nonnull + private final Properties dynamicProperties; + + @Nonnull + private final String[] args; - public ClusterConfiguration(String configDir, int restPort) { - this.configDir = Preconditions.checkNotNull(configDir); - this.restPort = restPort; + public ClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args) { + this.configDir = configDir; + this.dynamicProperties = dynamicProperties; + this.args = args; } + @Nonnull public String getConfigDir() { return configDir; } - public int getRestPort() { - return restPort; + @Nonnull + public Properties getDynamicProperties() { + return dynamicProperties; + } + + @Nonnull + public String[] getArgs() { + return args; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java new file mode 100644 index 0000000000000..12115d726a43e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; + +/** + * Parser factory which generates a {@link ClusterConfiguration} from the given + * list of command line arguments. + */ +public class ClusterConfigurationParserFactory implements ParserResultFactory { + + @Override + public Options getOptions() { + final Options options = new Options(); + options.addOption(CONFIG_DIR_OPTION); + options.addOption(DYNAMIC_PROPERTY_OPTION); + + return options; + } + + @Override + public ClusterConfiguration createResult(@Nonnull CommandLine commandLine) { + final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); + + final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); + + return new ClusterConfiguration(configDir, dynamicProperties, commandLine.getArgs()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index a267abb41376c..9ae7b8bfcc81e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.entrypoint; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -44,6 +43,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.MiniDispatcher; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -688,27 +688,16 @@ protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphS Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException; - protected static ClusterConfiguration parseArguments(String[] args) { - ParameterTool parameterTool = ParameterTool.fromArgs(args); + private static EntrypointClusterConfiguration parseArguments(String[] args) throws FlinkParseException { + final CommandLineParser clusterConfigurationParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); - final String configDir = parameterTool.get("configDir", ""); - - final int restPort; - - final String portKey = "webui-port"; - if (parameterTool.has(portKey)) { - restPort = Integer.valueOf(parameterTool.get(portKey)); - } else { - restPort = -1; - } - - return new ClusterConfiguration(configDir, restPort); + return clusterConfigurationParser.parse(args); } - protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) { - final Configuration configuration = GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir()); + protected static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) { + final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir()); - final int restPort = clusterConfiguration.getRestPort(); + final int restPort = entrypointClusterConfiguration.getRestPort(); if (restPort >= 0) { configuration.setInteger(RestOptions.PORT, restPort); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java new file mode 100644 index 0000000000000..75cad7aa946f6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +/** + * Basic {@link ClusterConfiguration} for entry points. + */ +public class EntrypointClusterConfiguration extends ClusterConfiguration { + + private final int restPort; + + public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort) { + super(configDir, dynamicProperties, args); + this.restPort = restPort; + } + + public int getRestPort() { + return restPort; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java new file mode 100644 index 0000000000000..7dfb784a79cb0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION; + +/** + * Parser factory for {@link EntrypointClusterConfiguration}. + */ +public class EntrypointClusterConfigurationParserFactory implements ParserResultFactory { + + @Override + public Options getOptions() { + final Options options = new Options(); + options.addOption(CONFIG_DIR_OPTION); + options.addOption(REST_PORT_OPTION); + options.addOption(DYNAMIC_PROPERTY_OPTION); + + return options; + } + + @Override + public EntrypointClusterConfiguration createResult(@Nonnull CommandLine commandLine) { + final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); + final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); + final String restPortStr = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1"); + final int restPort = Integer.parseInt(restPortStr); + + return new EntrypointClusterConfiguration( + configDir, + dynamicProperties, + commandLine.getArgs(), + restPort); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FlinkParseException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FlinkParseException.java new file mode 100644 index 0000000000000..1ba25d0824c18 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FlinkParseException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.util.FlinkException; + +/** + * Exception which indicates that the parsing of command line + * arguments failed. + */ +public class FlinkParseException extends FlinkException { + + private static final long serialVersionUID = 5164983338744708430L; + + public FlinkParseException(String message) { + super(message); + } + + public FlinkParseException(Throwable cause) { + super(cause); + } + + public FlinkParseException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java index 0f205f1a49ad8..d56725c5bf023 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -84,7 +85,18 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - Configuration configuration = loadConfiguration(parseArguments(args)); + EntrypointClusterConfiguration entrypointClusterConfiguration = null; + final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); + + try { + entrypointClusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse command line arguments {}.", args, e); + commandLineParser.printHelp(); + System.exit(1); + } + + Configuration configuration = loadConfiguration(entrypointClusterConfiguration); StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java new file mode 100644 index 0000000000000..23c9da2485f90 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.parser; + +import org.apache.commons.cli.Option; + +/** + * Container class for command line options. + */ +public class CommandLineOptions { + + public static final Option CONFIG_DIR_OPTION = Option.builder("c") + .longOpt("configDir") + .required(true) + .hasArg(true) + .argName("configuration directory") + .desc("Directory which contains the configuration file flink-conf.yml.") + .build(); + + public static final Option REST_PORT_OPTION = Option.builder("r") + .longOpt("webui-port") + .required(false) + .hasArg(true) + .argName("rest port") + .desc("Port for the rest endpoint and the web UI.") + .build(); + + public static final Option DYNAMIC_PROPERTY_OPTION = Option.builder("D") + .argName("property=value") + .numberOfArgs(2) + .valueSeparator('=') + .desc("use value for given property") + .build(); + + private CommandLineOptions() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java new file mode 100644 index 0000000000000..f9e199c8baf9f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.parser; + +import org.apache.flink.runtime.entrypoint.FlinkParseException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import javax.annotation.Nonnull; + +/** + * Command line parser which produces a result from the given + * command line arguments. + */ +public class CommandLineParser { + + @Nonnull + private final ParserResultFactory parserResultFactory; + + public CommandLineParser(@Nonnull ParserResultFactory parserResultFactory) { + this.parserResultFactory = parserResultFactory; + } + + public T parse(@Nonnull String[] args) throws FlinkParseException { + final DefaultParser parser = new DefaultParser(); + final Options options = parserResultFactory.getOptions(); + + final CommandLine commandLine; + try { + commandLine = parser.parse(options, args, true); + } catch (ParseException e) { + throw new FlinkParseException("Failed to parse the command line arguments.", e); + } + + return parserResultFactory.createResult(commandLine); + } + + public void printHelp() { + final HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("", parserResultFactory.getOptions()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ParserResultFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ParserResultFactory.java new file mode 100644 index 0000000000000..bc93f57405419 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ParserResultFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.parser; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +/** + * Parser result factory used by the {@link CommandLineParser}. + * + * @param type of the parsed result + */ +public interface ParserResultFactory { + + /** + * Returns all relevant {@link Options} for parsing the command line + * arguments. + * + * @return Options to use for the parsing + */ + Options getOptions(); + + /** + * Create the result of the command line argument parsing. + * + * @param commandLine to extract the options from + * @return Result of the parsing + */ + T createResult(@Nonnull CommandLine commandLine); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java new file mode 100644 index 0000000000000..7447439b26300 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link ClusterConfigurationParserFactory}. + */ +public class ClusterConfigurationParserFactoryTest extends TestLogger { + + private static final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String key = "key"; + final String value = "value"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, String.format("-D%s=%s", key, value), arg1, arg2}; + + final ClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String[] args = {"--configDir", configDir}; + + final ClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final String[] args = {}; + + commandLineParser.parse(args); + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java new file mode 100644 index 0000000000000..da63b7fe04694 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link EntrypointClusterConfigurationParserFactory}. + */ +public class EntrypointClusterConfigurationParserFactoryTest extends TestLogger { + + private static final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final String configDir = "/foo/bar"; + final int restPort = 1234; + final String key = "key"; + final String value = "value"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2}; + + final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String[] args = {"--configDir", configDir}; + + final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final String[] args = {}; + + commandLineParser.parse(args); + } +} From 8f467c1e9727d5a86d38d0b49753c534a1a161da Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 9 Jul 2018 23:54:55 +0200 Subject: [PATCH 3/9] [FLINK-9488] Add container entry point StandaloneJobClusterEntryPoint The StandaloneJobClusterEntryPoint is the basic entry point for containers. It is started with the user code jar in its classpath and the classname of the user program. The entrypoint will then load this user program via the classname and execute its main method. This will generate a JobGraph which is then used to start the MiniDispatcher. This closes #6315. --- flink-container/pom.xml | 69 ++++++++ .../StandaloneJobClusterConfiguration.java | 43 +++++ ...eJobClusterConfigurationParserFactory.java | 75 +++++++++ .../StandaloneJobClusterEntryPoint.java | 156 ++++++++++++++++++ .../src/main/resources/log4j.properties | 27 +++ ...ClusterConfigurationParserFactoryTest.java | 84 ++++++++++ .../StandaloneJobClusterEntryPointTest.java | 53 ++++++ .../flink/container/entrypoint/TestJob.java | 40 +++++ .../src/test/resources/log4j-test.properties | 32 ++++ ...ClusterConfigurationParserFactoryTest.java | 1 - pom.xml | 1 + 11 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 flink-container/pom.xml create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java create mode 100644 flink-container/src/main/resources/log4j.properties create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java create mode 100644 flink-container/src/test/resources/log4j-test.properties diff --git a/flink-container/pom.xml b/flink-container/pom.xml new file mode 100644 index 0000000000000..b20d32117037d --- /dev/null +++ b/flink-container/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + + org.apache.flink + flink-parent + 1.6-SNAPSHOT + .. + + + flink-container_${scala.binary.version} + flink-container + jar + + + + + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + provided + + + + + + org.apache.flink + flink-test-utils-junit + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test + + + + diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java new file mode 100644 index 0000000000000..e68e74b80a402 --- /dev/null +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +/** + * Configuration for the {@link StandaloneJobClusterEntryPoint}. + */ +final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration { + @Nonnull + private final String jobClassName; + + public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) { + super(configDir, dynamicProperties, args, restPort); + this.jobClassName = jobClassName; + } + + @Nonnull + String getJobClassName() { + return jobClassName; + } +} diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java new file mode 100644 index 0000000000000..c0cb473972594 --- /dev/null +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION; + +/** + * Parser factory which generates a {@link StandaloneJobClusterConfiguration} from a given + * list of command line arguments. + */ +public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory { + + private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j") + .longOpt("job-classname") + .required(true) + .hasArg(true) + .argName("job class name") + .desc("Class name of the job to run.") + .build(); + + @Override + public Options getOptions() { + final Options options = new Options(); + options.addOption(CONFIG_DIR_OPTION); + options.addOption(REST_PORT_OPTION); + options.addOption(JOB_CLASS_NAME_OPTION); + options.addOption(DYNAMIC_PROPERTY_OPTION); + + return options; + } + + @Override + public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) { + final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); + final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); + final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1"); + final int restPort = Integer.parseInt(restPortString); + final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); + + return new StandaloneJobClusterConfiguration( + configDir, + dynamicProperties, + commandLine.getArgs(), + restPort, + jobClassName); + } +} diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java new file mode 100644 index 0000000000000..47cca4c7d8509 --- /dev/null +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * {@link JobClusterEntrypoint} which is started with a job in a predefined + * location. + */ +public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { + + private static final String[] EMPTY_ARGS = new String[0]; + + @Nonnull + private final String jobClassName; + + StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) { + super(configuration); + this.jobClassName = jobClassName; + } + + @Override + protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException { + final PackagedProgram packagedProgram = createPackagedProgram(); + final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); + try { + final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism); + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; + } catch (Exception e) { + throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e); + } + } + + private PackagedProgram createPackagedProgram() throws FlinkException { + try { + final Class mainClass = getClass().getClassLoader().loadClass(jobClassName); + return new PackagedProgram(mainClass, EMPTY_ARGS); + } catch (ClassNotFoundException | ProgramInvocationException e) { + throw new FlinkException("Could not load the provied entrypoint class.", e); + } + } + + @Override + protected void registerShutdownActions(CompletableFuture terminationFuture) { + terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true)); + } + + @Override + protected ResourceManager createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl) throws Exception { + final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + resourceManagerRuntimeServicesConfiguration, + highAvailabilityServices, + rpcService.getScheduledExecutor()); + + return new StandaloneResourceManager( + rpcService, + ResourceManager.RESOURCE_MANAGER_NAME, + resourceId, + resourceManagerConfiguration, + highAvailabilityServices, + heartbeatServices, + resourceManagerRuntimeServices.getSlotManager(), + metricRegistry, + resourceManagerRuntimeServices.getJobLeaderIdService(), + clusterInformation, + fatalErrorHandler); + } + + public static void main(String[] args) { + // startup checks and logging + EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneJobClusterEntryPoint.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + StandaloneJobClusterConfiguration clusterConfiguration = null; + + try { + clusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse command line arguments {}.", args, e); + commandLineParser.printHelp(); + System.exit(1); + } + + Configuration configuration = loadConfiguration(clusterConfiguration); + + configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString()); + + StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName()); + + entrypoint.startCluster(); + } + +} diff --git a/flink-container/src/main/resources/log4j.properties b/flink-container/src/main/resources/log4j.properties new file mode 100644 index 0000000000000..62cb6ed9cca57 --- /dev/null +++ b/flink-container/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# Convenience file for local debugging of the JobManager/TaskManager. +log4j.rootLogger=OFF, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +log4j.logger.org.apache.flink.mesos=DEBUG +log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java new file mode 100644 index 0000000000000..1f39a0609e702 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link StandaloneJobClusterConfigurationParserFactory}. + */ +public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger { + + private static final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String key = "key"; + final String value = "value"; + final int restPort = 1234; + final String jobClassName = "foobar"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String jobClassName = "foobar"; + final String[] args = {"--configDir", configDir, "--job-classname", jobClassName}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final String[] args = {}; + + commandLineParser.parse(args); + } +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java new file mode 100644 index 0000000000000..360799d19a823 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link StandaloneJobClusterEntryPoint}. + */ +public class StandaloneJobClusterEntryPointTest extends TestLogger { + + @Test + public void testJobGraphRetrieval() throws FlinkException { + final Configuration configuration = new Configuration(); + final int parallelism = 42; + configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); + final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint( + configuration, + TestJob.class.getCanonicalName()); + + final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration); + + assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName()))); + assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); + } + +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java new file mode 100644 index 0000000000000..5f8857fc35fb4 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; + +/** + * Test job which is used for {@link StandaloneJobClusterEntryPointTest}. + */ +public class TestJob { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStreamSource source = env.fromElements(1, 2, 3, 4); + final SingleOutputStreamOperator mapper = source.map(element -> 2 * element); + mapper.addSink(new DiscardingSink<>()); + + env.execute(TestJob.class.getCanonicalName()); + } +} diff --git a/flink-container/src/test/resources/log4j-test.properties b/flink-container/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..b85f2f20239bf --- /dev/null +++ b/flink-container/src/test/resources/log4j-test.properties @@ -0,0 +1,32 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console + +# log whats going on between the tests +log4j.logger.org.apache.flink.runtime.leaderelection=OFF +log4j.logger.org.apache.flink.runtime.leaderretrieval=OFF + diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java index 7447439b26300..62da39e592550 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java @@ -73,5 +73,4 @@ public void testMissingRequiredArgument() throws FlinkParseException { commandLineParser.parse(args); } - } diff --git a/pom.xml b/pom.xml index 897ae3cf9e74a..1f35cd44892ce 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ under the License. flink-formats flink-examples flink-clients + flink-container flink-queryable-state flink-tests flink-end-to-end-tests From 5a4bdf2c9fd1693ad3b90dbbd3bcb589ed15c101 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 10 Jul 2018 11:24:26 +0200 Subject: [PATCH 4/9] [FLINK-9819] Add startup scripts for standalone job cluster entry point This closes #6316. --- flink-dist/pom.xml | 6 ++ .../src/main/flink-bin/bin/flink-console.sh | 6 +- .../src/main/flink-bin/bin/flink-daemon.sh | 6 +- .../src/main/flink-bin/bin/standalone-job.sh | 66 +++++++++++++++++++ 4 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 flink-dist/src/main/flink-bin/bin/standalone-job.sh diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 8ae6d70c7ca63..8bfbace5a269f 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -122,6 +122,12 @@ under the License. ${project.version} + + org.apache.flink + flink-container_${scala.binary.version} + ${project.version} + + org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh index 3ccbbd0b99d5b..64a428dfa6e0e 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-console.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -19,7 +19,7 @@ # Start a Flink service as a console application. Must be stopped with Ctrl-C # or with SIGTERM by kill or the controlling process. -USAGE="Usage: flink-console.sh (jobmanager|taskmanager|historyserver|zookeeper) [args]" +USAGE="Usage: flink-console.sh (jobmanager|taskmanager|taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" SERVICE=$1 ARGS=("${@:2}") # get remaining arguments as array @@ -54,6 +54,10 @@ case $SERVICE in CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint ;; + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint + ;; + (*) echo "Unknown service '${SERVICE}'. $USAGE." exit 1 diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index b337a1770c1fa..3a9f61dcb41e0 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a Flink daemon. -USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zookeeper|historyserver) [args]" +USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" STARTSTOP=$1 DAEMON=$2 @@ -54,6 +54,10 @@ case $DAEMON in CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint ;; + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint + ;; + (*) echo "Unknown daemon '${DAEMON}'. $USAGE." exit 1 diff --git a/flink-dist/src/main/flink-bin/bin/standalone-job.sh b/flink-dist/src/main/flink-bin/bin/standalone-job.sh new file mode 100644 index 0000000000000..586a8bacb00ae --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/standalone-job.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink JobManager. +USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop" + +STARTSTOP=$1 +ENTRY_POINT_NAME="standalonejob" + +ARGS=("${@:2}") + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] || [[ -z JOB_CLASSNAME ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then + echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`" + else + flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP}) + FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes}) + fi + + if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then + echo "[ERROR] Configured memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then + export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m" + fi + + # Add cluster entry point specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" + + # Startup parameters + ARGS+=("--configDir" "${FLINK_CONF_DIR}") +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRY_POINT_NAME "${ARGS[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRY_POINT_NAME "${ARGS[@]}" +fi From 2fbbf8ee662647c71581f5cd989226be820fed0f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 10 Jul 2018 23:23:59 +0200 Subject: [PATCH 5/9] [FLINK-9820] Forward dynamic properties to Flink configuration in ClusterEntrypoint With this commit we can use dynamic properties to overwrite configuration values in the ClusterEntrypoint. This closes #6317. --- .../configuration/ConfigurationUtils.java | 21 ++++++++ .../configuration/ConfigurationUtilsTest.java | 54 +++++++++++++++++++ .../runtime/entrypoint/ClusterEntrypoint.java | 6 ++- 3 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 8566a43a3413a..3d1d8300d87b1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -21,6 +21,8 @@ import javax.annotation.Nonnull; import java.io.File; +import java.util.Properties; +import java.util.Set; /** * Utility class for {@link Configuration} related helper functions. @@ -54,6 +56,25 @@ public static String[] parseLocalStateDirectories(Configuration configuration) { return splitPaths(configValue); } + /** + * Creates a new {@link Configuration} from the given {@link Properties}. + * + * @param properties to convert into a {@link Configuration} + * @return {@link Configuration} which has been populated by the values of the given {@link Properties} + */ + @Nonnull + public static Configuration createConfiguration(Properties properties) { + final Configuration configuration = new Configuration(); + + final Set propertyNames = properties.stringPropertyNames(); + + for (String propertyName : propertyNames) { + configuration.setString(propertyName, properties.getProperty(propertyName)); + } + + return configuration; + } + @Nonnull private static String[] splitPaths(@Nonnull String separatedPaths) { return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY; diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java new file mode 100644 index 0000000000000..2019d98ded7e8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link ConfigurationUtils}. + */ +public class ConfigurationUtilsTest extends TestLogger { + + @Test + public void testPropertiesToConfiguration() { + final Properties properties = new Properties(); + final int entries = 10; + + for (int i = 0; i < entries; i++) { + properties.setProperty("key" + i, "value" + i); + } + + final Configuration configuration = ConfigurationUtils.createConfiguration(properties); + + for (String key : properties.stringPropertyNames()) { + assertThat(configuration.getString(key, ""), is(equalTo(properties.getProperty(key)))); + } + + assertThat(configuration.toMap().size(), is(properties.size())); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 9ae7b8bfcc81e..b429de5d7de9a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -688,14 +689,15 @@ protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphS Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException; - private static EntrypointClusterConfiguration parseArguments(String[] args) throws FlinkParseException { + protected static EntrypointClusterConfiguration parseArguments(String[] args) throws FlinkParseException { final CommandLineParser clusterConfigurationParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); return clusterConfigurationParser.parse(args); } protected static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) { - final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir()); + final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(entrypointClusterConfiguration.getDynamicProperties()); + final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir(), dynamicProperties); final int restPort = entrypointClusterConfiguration.getRestPort(); From 740f2fbf2e65fa988c6a577989ccd8923729be45 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 10 Jul 2018 23:43:34 +0200 Subject: [PATCH 6/9] [FLINK-9821] Forward dynamic properties to configuration in TaskManagerRunner With this commit we can use dynamic properties to overwrite configuration values in the TaskManagerRunner. This closes #6318. --- .../src/main/flink-bin/bin/taskmanager.sh | 10 ++++--- .../taskexecutor/TaskManagerRunner.java | 29 +++++++++++++++---- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index 771d53fcb377f..0d70f34002610 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -22,6 +22,8 @@ USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" STARTSTOP=$1 +ARGS=("${@:2}") + if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then echo $USAGE exit 1 @@ -72,15 +74,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" # Startup parameters - args=("--configDir" "${FLINK_CONF_DIR}") + ARGS+=("--configDir" "${FLINK_CONF_DIR}") fi if [[ $STARTSTOP == "start-foreground" ]]; then - exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}" + exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${ARGS[@]}" else if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then # Start a single TaskManager - "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}" + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}" else # Example output from `numactl --show` on an AWS c4.8xlarge: # policy: default @@ -92,7 +94,7 @@ else read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") for NODE_ID in "${NODE_LIST[@]:1}"; do # Start a TaskManager for each NUMA node - numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}" + numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}" done fi fi diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 23fd29e4e105c..91c57047e4fda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; @@ -29,6 +29,10 @@ import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.entrypoint.ClusterConfiguration; +import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -274,11 +278,7 @@ public static void main(String[] args) throws Exception { LOG.info("Cannot determine the maximum number of open file descriptors"); } - ParameterTool parameterTool = ParameterTool.fromArgs(args); - - final String configDir = parameterTool.get("configDir"); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir); + final Configuration configuration = loadConfiguration(args); try { FileSystem.initialize(configuration); @@ -303,6 +303,23 @@ public Void call() throws Exception { } } + private static Configuration loadConfiguration(String[] args) throws FlinkParseException { + final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + + final ClusterConfiguration clusterConfiguration; + + try { + clusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse the command line options.", e); + commandLineParser.printHelp(); + throw e; + } + + final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties()); + return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties); + } + public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId); From 56e5381cb7aba01f1d7ecfa11e4be7f505a35baf Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 10 Jul 2018 15:41:18 +0200 Subject: [PATCH 7/9] [FLINK-9822] Add Dockerfile for StandaloneJobClusterEntryPoint image This commit adds a Dockerfile for a standalone job cluster image. The image contains the Flink distribution and a specified user code jar. The entrypoint will start the StandaloneJobClusterEntryPoint with the provided job classname. This closes #6319. --- flink-container/docker/Dockerfile | 50 +++++++++ flink-container/docker/README.md | 40 +++++++ flink-container/docker/build.sh | 116 ++++++++++++++++++++ flink-container/docker/docker-compose.yml | 31 ++++++ flink-container/docker/docker-entrypoint.sh | 44 ++++++++ 5 files changed, 281 insertions(+) create mode 100644 flink-container/docker/Dockerfile create mode 100644 flink-container/docker/README.md create mode 100755 flink-container/docker/build.sh create mode 100644 flink-container/docker/docker-compose.yml create mode 100755 flink-container/docker/docker-entrypoint.sh diff --git a/flink-container/docker/Dockerfile b/flink-container/docker/Dockerfile new file mode 100644 index 0000000000000..458c21452784f --- /dev/null +++ b/flink-container/docker/Dockerfile @@ -0,0 +1,50 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +FROM java:8-jre-alpine + +# Install requirements +RUN apk add --no-cache bash snappy + +# Flink environment variables +ENV FLINK_INSTALL_PATH=/opt +ENV FLINK_HOME $FLINK_INSTALL_PATH/flink +ENV FLINK_LIB_DIR $FLINK_HOME/lib +ENV PATH $PATH:$FLINK_HOME/bin + +# flink-dist can point to a directory or a tarball on the local system +ARG flink_dist=NOT_SET +ARG job_jar=NOT_SET + +# Install build dependencies and flink +ADD $flink_dist $FLINK_INSTALL_PATH +ADD $job_jar $FLINK_INSTALL_PATH/job.jar + +RUN set -x && \ + ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \ + ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \ + addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \ + chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \ + chown -h flink:flink $FLINK_HOME + +COPY docker-entrypoint.sh / + +USER flink +EXPOSE 8081 6123 +ENTRYPOINT ["/docker-entrypoint.sh"] +CMD ["--help"] diff --git a/flink-container/docker/README.md b/flink-container/docker/README.md new file mode 100644 index 0000000000000..644b31c7a1a74 --- /dev/null +++ b/flink-container/docker/README.md @@ -0,0 +1,40 @@ +# Apache Flink job cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + + build.sh --from-local-dist --job-jar /path/to/job/jar/job.jar --image-name flink:job + +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: + + docker build --build-arg FLINK_VERSION=1.6.0 --build-arg HADOOP_VERSION=28 --build-arg SCALA_VERSION=2.11 -t "flink:1.6.0-hadoop2.8-scala_2.11" flink + +## Deploy + +- Deploy cluster and see config/setup log output (best run in a screen session) + + docker-compose up + +- Deploy as a daemon (and return) + + docker-compose up -d + +- Scale the cluster up or down to *N* TaskManagers + + docker-compose scale taskmanager= + +- Access the Job Manager container + + docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh + +- Kill the cluster + + docker-compose kill diff --git a/flink-container/docker/build.sh b/flink-container/docker/build.sh new file mode 100755 index 0000000000000..13a536cc6c93d --- /dev/null +++ b/flink-container/docker/build.sh @@ -0,0 +1,116 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +usage() { + cat < --from-local-dist [--image-name ] + build.sh --job-jar --from-release --flink-version --hadoop-version --scala-version [--image-name ] + build.sh --help + + If the --image-name flag is not used the built image name will be 'flink'. +HERE + exit 1 +} + +while [[ $# -ge 1 ]] +do +key="$1" + case $key in + --job-jar) + JOB_JAR_PATH="$2" + ;; + --from-local-dist) + FROM_LOCAL="true" + ;; + --from-release) + FROM_RELEASE="true" + ;; + --image-name) + IMAGE_NAME="$2" + shift + ;; + --flink-version) + FLINK_VERSION="$2" + shift + ;; + --hadoop-version) + HADOOP_VERSION="$(echo "$2" | sed 's/\.//')" + shift + ;; + --scala-version) + SCALA_VERSION="$2" + shift + ;; + --kubernetes-certificates) + CERTIFICATES_DIR="$2" + shift + ;; + --help) + usage + ;; + *) + # unknown option + ;; + esac + shift +done + +IMAGE_NAME=${IMAGE_NAME:-flink-job} + +# TMPDIR must be contained within the working directory so it is part of the +# Docker context. (i.e. it can't be mktemp'd in /tmp) +TMPDIR=_TMP_ + +cleanup() { + rm -rf "${TMPDIR}" +} +trap cleanup EXIT + +mkdir -p "${TMPDIR}" + +JOB_JAR_TARGET="${TMPDIR}/job.jar" +cp ${JOB_JAR_PATH} ${JOB_JAR_TARGET} + +if [ -n "${FROM_RELEASE}" ]; then + + [[ -n "${FLINK_VERSION}" ]] && [[ -n "${HADOOP_VERSION}" ]] && [[ -n "${SCALA_VERSION}" ]] || usage + + FLINK_DIST_FILE_NAME="flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz" + CURL_OUTPUT="${TMPDIR}/${FLINK_DIST_FILE_NAME}" + + echo "Downloading ${FLINK_DIST_FILE_NAME} from ${FLINK_BASE_URL}" + curl -# "https://archive.apache.org/dist/flink/flink-${FLINK_VERSION}/${FLINK_DIST_FILE_NAME}" --output ${CURL_OUTPUT} + + FLINK_DIST="${CURL_OUTPUT}" + +elif [ -n "${FROM_LOCAL}" ]; then + + DIST_DIR="../../flink-dist/target/flink-*-bin" + FLINK_DIST="${TMPDIR}/flink.tgz" + echo "Using flink dist: ${DIST_DIR}" + tar -C ${DIST_DIR} -cvzf "${FLINK_DIST}" . + +else + + usage + +fi + +docker build --build-arg flink_dist="${FLINK_DIST}" --build-arg job_jar="${JOB_JAR_TARGET}" -t "${IMAGE_NAME}" . diff --git a/flink-container/docker/docker-compose.yml b/flink-container/docker/docker-compose.yml new file mode 100644 index 0000000000000..81e4c8c8a54b4 --- /dev/null +++ b/flink-container/docker/docker-compose.yml @@ -0,0 +1,31 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set the FLINK_DOCKER_IMAGE_NAME environment variable to override the image name to use + +version: "2.1" +services: + job-cluster: + image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job} + ports: + - "8081:8081" + command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster + + taskmanager: + image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job} + command: task-manager -Djobmanager.rpc.address=job-cluster diff --git a/flink-container/docker/docker-entrypoint.sh b/flink-container/docker/docker-entrypoint.sh new file mode 100755 index 0000000000000..85ba8662bee2b --- /dev/null +++ b/flink-container/docker/docker-entrypoint.sh @@ -0,0 +1,44 @@ +#!/bin/sh + +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +### If unspecified, the hostname of the container is taken as the JobManager address +FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"} + +JOB_CLUSTER="job-cluster" +TASK_MANAGER="task-manager" + +CMD="$1" +shift; + +if [ "${CMD}" == "--help" -o "${CMD}" == "-h" ]; then + echo "Usage: $(basename $0) (${JOB_CLUSTER}|${TASK_MANAGER})" + exit 0 +elif [ "${CMD}" == "${JOB_CLUSTER}" -o "${CMD}" == "${TASK_MANAGER}" ]; then + echo "Starting the ${CMD}" + echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml + + if [ "${CMD}" == "${TASK_MANAGER}" ]; then + exec $FLINK_HOME/bin/taskmanager.sh start-foreground "$@" + else + exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@" + fi +fi + +exec "$@" From 387a3bc198cfea016abd92953c7fce28e641cf67 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 11 Jul 2018 00:52:08 +0200 Subject: [PATCH 8/9] [FLINK-9823] Add Kubernetes deployment ymls The Kubernetes files contain a job-cluster service specification, a job specification for the StandaloneJobClusterEntryPoint and a deployment for TaskManagers. This closes #6320. --- flink-container/kubernetes/README.md | 59 +++++++++++++++++++ .../kubernetes/job-cluster-job.yaml.template | 44 ++++++++++++++ .../kubernetes/job-cluster-service.yaml | 41 +++++++++++++ .../task-manager-deployment.yaml.template | 34 +++++++++++ 4 files changed, 178 insertions(+) create mode 100644 flink-container/kubernetes/README.md create mode 100644 flink-container/kubernetes/job-cluster-job.yaml.template create mode 100644 flink-container/kubernetes/job-cluster-service.yaml create mode 100644 flink-container/kubernetes/task-manager-deployment.yaml.template diff --git a/flink-container/kubernetes/README.md b/flink-container/kubernetes/README.md new file mode 100644 index 0000000000000..9e0449616e28a --- /dev/null +++ b/flink-container/kubernetes/README.md @@ -0,0 +1,59 @@ +# Apache Flink job cluster deployment on Kubernetes + +## Build container image using Docker + +In order to deploy a job cluster on Kubernetes, you first need to build a Docker image containing Flink and the user code jar. +Please follow the instructions you can find [here](../docker/README.md) to build a job container image. + +## Deploy Flink job cluster + +This directory contains a predefined K8s service and two template files for the job cluster entry point and the task managers. + +The K8s service is used to let the cluster pods find each other. +If you start the Flink cluster in HA mode, then this is not necessary, because the HA implementation is used to detect leaders. + +In order to use the template files, please replace the `${VARIABLES}` in the file with concrete values. +The files contain the following variables: + +- `${FLINK_IMAGE_NAME}`: Name of the image to use for the container +- `${FLINK_JOB}`: Name of the Flink job to start (the user code jar must be included in the container image) +- `${FLINK_JOB_PARALLELISM}`: Degree of parallelism with which to start the Flink job and the number of required task managers + +One way to substitute the variables is to use `envsubst`. +See [here]((https://stackoverflow.com/a/23622446/4815083)) for a guide to install it on Mac OS X. + +In non HA mode, you should first start the job cluster service: + +`kubectl create -f job-cluster-service.yaml` + +In order to deploy the job cluster entrypoint run: + +`FLINK_IMAGE_NAME= FLINK_JOB= FLINK_JOB_PARALLELISM= envsubst < job-cluster-job.yaml.template | kubectl create -f -` + +Now you should see the `flink-job-cluster` job being started by calling `kubectl get job`. + +At last, you should start the task manager deployment: + +`FLINK_IMAGE_NAME= FLINK_JOB_PARALLELISM= envsubst < task-manager-deployment.yaml.template | kubectl create -f -` + +## Interact with Flink job cluster + +After starting the job cluster service, the web UI will be available under `:30081`. +You can then use the Flink client to send Flink commands to the cluster: + +`bin/flink list -m ` + +## Terminate Flink job cluster + +The job cluster entry point pod is part of the Kubernetes job and terminates once the Flink job reaches a globally terminal state. +Alternatively, you can also stop the job manually. + +`kubectl delete job flink-job-cluster` + +The task manager pods are part of the task manager deployment and need to be deleted manually by calling + +`kubectl delete deployment flink-task-manager` + +Last but not least you should also stop the job cluster service + +`kubectl delete service flink-job-cluster` diff --git a/flink-container/kubernetes/job-cluster-job.yaml.template b/flink-container/kubernetes/job-cluster-job.yaml.template new file mode 100644 index 0000000000000..0bbb1a1aa3a0a --- /dev/null +++ b/flink-container/kubernetes/job-cluster-job.yaml.template @@ -0,0 +1,44 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: batch/v1 +kind: Job +metadata: + name: flink-job-cluster +spec: + template: + metadata: + labels: + app: flink + component: job-cluster + spec: + restartPolicy: OnFailure + containers: + - name: flink-job-cluster + image: ${FLINK_IMAGE_NAME} + args: ["job-cluster", "--job-classname", "${FLINK_JOB}", "-Djobmanager.rpc.address=flink-job-cluster", + "-Dparallelism.default=${FLINK_JOB_PARALLELISM}", "-Dblob.server.port=6124", "-Dquery.server.ports=6125"] + ports: + - containerPort: 6123 + name: rpc + - containerPort: 6124 + name: blob + - containerPort: 6125 + name: query + - containerPort: 8081 + name: ui diff --git a/flink-container/kubernetes/job-cluster-service.yaml b/flink-container/kubernetes/job-cluster-service.yaml new file mode 100644 index 0000000000000..443a027dc8f89 --- /dev/null +++ b/flink-container/kubernetes/job-cluster-service.yaml @@ -0,0 +1,41 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: v1 +kind: Service +metadata: + name: flink-job-cluster + labels: + app: flink + component: job-cluster +spec: + ports: + - name: rpc + port: 6123 + - name: blob + port: 6124 + - name: query + port: 6125 + nodePort: 30025 + - name: ui + port: 8081 + nodePort: 30081 + type: NodePort + selector: + app: flink + component: job-cluster diff --git a/flink-container/kubernetes/task-manager-deployment.yaml.template b/flink-container/kubernetes/task-manager-deployment.yaml.template new file mode 100644 index 0000000000000..55bd9a934c22f --- /dev/null +++ b/flink-container/kubernetes/task-manager-deployment.yaml.template @@ -0,0 +1,34 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: flink-task-manager +spec: + replicas: ${FLINK_JOB_PARALLELISM} + template: + metadata: + labels: + app: flink + component: task-manager + spec: + containers: + - name: flink-task-manager + image: ${FLINK_IMAGE_NAME} + args: ["task-manager", "-Djobmanager.rpc.address=flink-job-cluster"] From 134e04442dc9506b698b8c1815c11a253c8b7cc5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 11 Jul 2018 16:13:30 +0200 Subject: [PATCH 9/9] [hotfix] Support building a job image from a Flink archive Extend the flink-container/docker/build.sh script to also accept a Flink archive to build the image from. This makes it easier to build an image from one of the convenience releases. --- flink-container/docker/build.sh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flink-container/docker/build.sh b/flink-container/docker/build.sh index 13a536cc6c93d..614a9c3790368 100755 --- a/flink-container/docker/build.sh +++ b/flink-container/docker/build.sh @@ -21,6 +21,7 @@ usage() { cat < --from-local-dist [--image-name ] + build.sh --job-jar --from-archive [--image-name ] build.sh --job-jar --from-release --flink-version --hadoop-version --scala-version [--image-name ] build.sh --help @@ -35,10 +36,15 @@ key="$1" case $key in --job-jar) JOB_JAR_PATH="$2" + shift ;; --from-local-dist) FROM_LOCAL="true" ;; + --from-archive) + FROM_ARCHIVE="$2" + shift + ;; --from-release) FROM_RELEASE="true" ;; @@ -107,6 +113,10 @@ elif [ -n "${FROM_LOCAL}" ]; then echo "Using flink dist: ${DIST_DIR}" tar -C ${DIST_DIR} -cvzf "${FLINK_DIST}" . +elif [ -n "${FROM_ARCHIVE}" ]; then + FLINK_DIST="${TMPDIR}/flink.tgz" + cp "${FROM_ARCHIVE}" "${FLINK_DIST}" + else usage