/
ShellMain.java
363 lines (326 loc) · 14.4 KB
/
ShellMain.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
public class ShellMain extends LauncherMain {
public static final String CONF_OOZIE_SHELL_ARGS = "oozie.shell.args";
public static final String CONF_OOZIE_SHELL_EXEC = "oozie.shell.exec";
public static final String CONF_OOZIE_SHELL_ENVS = "oozie.shell.envs";
public static final String CONF_OOZIE_SHELL_CAPTURE_OUTPUT = "oozie.shell.capture-output";
public static final String CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR = "oozie.action.shell.setup.hadoop.conf.dir";
public static final String CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR_WRITE_LOG4J_PROPERTIES =
"oozie.action.shell.setup.hadoop.conf.dir.write.log4j.properties";
public static final String CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR_LOG4J_CONTENT =
"oozie.action.shell.setup.hadoop.conf.dir.log4j.content";
public static final String OOZIE_ACTION_CONF_XML = "OOZIE_ACTION_CONF_XML";
private static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
private static final String YARN_CONF_DIR = "YARN_CONF_DIR";
private static String LOG4J_PROPERTIES = "log4j.properties";
/**
* @param args Invoked from LauncherAMUtils:map()
* @throws Exception
*/
public static void main(String[] args) throws Exception {
run(ShellMain.class, args);
}
@Override
protected void run(String[] args) throws Exception {
Configuration actionConf = loadActionConf();
setYarnTag(actionConf);
setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
setApplicationTags(actionConf, SPARK_YARN_TAGS);
int exitCode = execute(actionConf);
if (exitCode != 0) {
// Shell command failed. therefore make the action failed
throw new LauncherMainException(1);
}
}
/**
* Execute the shell command
*
* @param actionConf
* @return command exit value
* @throws IOException
*/
private int execute(Configuration actionConf) throws Exception {
String exec = getExec(actionConf);
List<String> args = getShellArguments(actionConf);
ArrayList<String> cmdArray = getCmdList(exec, args.toArray(new String[args.size()]));
ProcessBuilder builder = new ProcessBuilder(cmdArray);
Map<String, String> envp = getEnvMap(builder.environment(), actionConf);
// Getting the Current working dir and setting it to processbuilder
File currDir = new File("dummy").getAbsoluteFile().getParentFile();
System.out.println("Current working dir " + currDir);
builder.directory(currDir);
// Setup Hadoop *-site files in case the user runs a Hadoop-type program (e.g. hive)
prepareHadoopConfigs(actionConf, envp, currDir);
printCommand(cmdArray, envp); // For debugging purpose
System.out.println("=================================================================");
System.out.println();
System.out.println(">>> Invoking Shell command line now >>");
System.out.println();
System.out.flush();
boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);
// Execute the Command
Process p = builder.start();
Thread[] thrArray = handleShellOutput(p, captureOutput);
int exitValue = p.waitFor();
// Wait for both the thread to exit
if (thrArray != null) {
for (Thread thr : thrArray) {
thr.join();
}
}
System.out.println("Exit code of the Shell command " + exitValue);
System.out.println("<<< Invocation of Shell command completed <<<");
System.out.println();
return exitValue;
}
/**
* This method takes the OOZIE_ACTION_CONF_XML and copies it to Hadoop *-site files in a new directory; it then sets the
* HADOOP/YARN_CONF_DIR to point there. This should allow most Hadoop ecosystem CLI programs to have the proper configuration,
* propagated from Oozie's copy and including anything set in the Workflow's configuration section as well. Otherwise,
* HADOOP/YARN_CONF_DIR points to the NodeManager's *-site files, which are likely not suitable for client programs.
* It will only do this if {@link #CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR} is set to true.
*
* @param actionConf The action configuration
* @param envp The environment for the Shell process
* @param currDir The current working dir
* @throws IOException
*/
private void prepareHadoopConfigs(Configuration actionConf, Map<String, String> envp, File currDir) throws IOException {
if (actionConf.getBoolean(CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR, false)) {
String actionXml = envp.get(OOZIE_ACTION_CONF_XML);
if (actionXml != null) {
File confDir = new File(currDir, "oozie-hadoop-conf-" + System.currentTimeMillis());
writeHadoopConfig(actionXml, confDir);
if (actionConf.getBoolean(CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR_WRITE_LOG4J_PROPERTIES, true)) {
System.out.println("Writing " + LOG4J_PROPERTIES + " to " + confDir);
writeLoggerProperties(actionConf, confDir);
}
System.out.println("Setting " + HADOOP_CONF_DIR + " and " + YARN_CONF_DIR
+ " to " + confDir.getAbsolutePath());
envp.put(HADOOP_CONF_DIR, confDir.getAbsolutePath());
envp.put(YARN_CONF_DIR, confDir.getAbsolutePath());
}
}
}
/**
* Write a {@link #LOG4J_PROPERTIES} file into the provided directory.
* Content of the log4j.properties sourced from the default value of property
* {@link #CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR_LOG4J_CONTENT}, defined in oozie-default.xml.
* This is required for properly redirecting command outputs to stderr.
* Otherwise, logging from commands may go into stdout and make it to the Shell's captured output.
* @param actionConf the action's configuration, to source log4j contents from
* @param confDir the directory to write a {@link #LOG4J_PROPERTIES} file under
*/
private static void writeLoggerProperties(Configuration actionConf, File confDir) throws IOException {
String log4jContents = actionConf.get(
CONF_OOZIE_SHELL_SETUP_HADOOP_CONF_DIR_LOG4J_CONTENT);
File log4jPropertiesFile = new File(confDir, LOG4J_PROPERTIES);
FileOutputStream log4jFileOutputStream = new FileOutputStream(log4jPropertiesFile, false);
PrintWriter log4jWriter = new PrintWriter(log4jFileOutputStream);
BufferedReader lineReader = new BufferedReader(new StringReader(log4jContents));
String line = lineReader.readLine();
while (line != null) {
// Trim the line (both preceding and trailing whitespaces) before writing it as a line in file
log4jWriter.println(line.trim());
line = lineReader.readLine();
}
log4jWriter.close();
}
/**
* Return the environment variable to pass to in shell command execution.
*
*/
private Map<String, String> getEnvMap(Map<String, String> envp, Configuration actionConf) {
// Adding user-specified environments
String[] envs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
for (String env : envs) {
String[] varValue = env.split("=",2); // Error case is handled in
// ShellActionExecutor
envp.put(varValue[0], varValue[1]);
}
// Adding action.xml to env
envp.put(OOZIE_ACTION_CONF_XML, System.getProperty("oozie.action.conf.xml", ""));
return envp;
}
/**
* Get the shell commands with the arguments
*
* @param exec
* @param args
* @return command and list of args
*/
private ArrayList<String> getCmdList(String exec, String[] args) {
ArrayList<String> cmdArray = new ArrayList<String>();
cmdArray.add(exec); // Main executable
for (String arg : args) { // Adding rest of the arguments
cmdArray.add(arg);
}
return cmdArray;
}
/**
* Print the output written by the Shell execution in its stdout/stderr.
* Also write the stdout output to a file for capturing.
*
* @param p process
* @param captureOutput indicates if STDOUT should be captured or not.
* @return Array of threads (one for stdout and another one for stderr
* processing
* @throws IOException thrown if an IO error occurrs.
*/
protected Thread[] handleShellOutput(Process p, boolean captureOutput)
throws IOException {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
thrStdout.setDaemon(true);
thrStdout.start();
OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
thrStderr.setDaemon(true);
thrStderr.start();
return new Thread[]{ thrStdout, thrStderr };
}
/**
* Thread to write output to LM stdout/stderr. Also write the content for
* capture-output.
*/
class OutputWriteThread extends Thread {
BufferedReader reader = null;
boolean isStdout = false;
boolean needCaptured = false;
public OutputWriteThread(BufferedReader reader, boolean isStdout, boolean needCaptured) {
this.reader = reader;
this.isStdout = isStdout;
this.needCaptured = needCaptured;
}
@Override
public void run() {
String line;
BufferedWriter os = null;
try {
if (needCaptured) {
File file = new File(System.getProperty(OUTPUT_PROPERTIES));
os = new BufferedWriter(new FileWriter(file));
}
while ((line = reader.readLine()) != null) {
if (isStdout) { // For stdout
// 1. Writing to LM STDOUT
System.out.println("Stdoutput " + line);
// 2. Writing for capture output
if (os != null) {
if (Shell.WINDOWS) {
line = line.replace("\\u", "\\\\u");
}
os.write(line);
os.newLine();
}
}
else {
System.err.println(line); // 1. Writing to LM STDERR
}
}
}
catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Stdout/Stderr read/write error :" + e);
}finally {
try {
reader.close();
}
catch (IOException ex) {
//NOP ignoring error on close of STDOUT/STDERR
}
if (os != null) {
try {
os.close();
}
catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Unable to close the file stream :" + e);
}
}
}
}
}
/**
* Print the command including the arguments as well as the environment
* setup
*
* @param cmdArray :Command Array
* @param envp :Environment array
*/
protected void printCommand(ArrayList<String> cmdArray, Map<String, String> envp) {
int i = 0;
System.out.println("Full Command .. ");
System.out.println("-------------------------");
for (String arg : cmdArray) {
System.out.println(i++ + ":" + arg + ":");
}
if (envp != null) {
System.out.println("List of passing environment");
System.out.println("-------------------------");
for (Map.Entry<String, String> entry : envp.entrySet()) {
System.out.println(entry.getKey() + "=" + entry.getValue() + ":");
}
}
}
/**
* Retrieve the list of arguments that were originally specified to
* Workflow.xml.
*
* @param actionConf
* @return argument list
*/
protected List<String> getShellArguments(Configuration actionConf) {
List<String> arguments = new ArrayList<String>();
String[] scrArgs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
for (String scrArg : scrArgs) {
arguments.add(scrArg);
}
return arguments;
}
/**
* Retrieve the executable name that was originally specified to
* Workflow.xml.
*
* @param actionConf
* @return executable
*/
protected String getExec(Configuration actionConf) {
String exec = actionConf.get(CONF_OOZIE_SHELL_EXEC);
if (exec == null) {
throw new RuntimeException("Action Configuration does not have " + CONF_OOZIE_SHELL_EXEC + " property");
}
return exec;
}
}