Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
OOZIE-2332 Add ability to provide Hive and Hive 2 Action queries inli…
…ne in workflows (prateekrungta via rkanter)
  • Loading branch information
rkanter committed Aug 18, 2015
1 parent 86e0af6 commit b074b2c
Show file tree
Hide file tree
Showing 15 changed files with 703 additions and 206 deletions.
8 changes: 6 additions & 2 deletions client/src/main/java/org/apache/oozie/cli/OozieCLI.java
Expand Up @@ -2081,6 +2081,8 @@ void validateCommandV41(CommandLine commandLine) throws OozieCLIException {
"hive-action-0.4.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"hive-action-0.5.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"hive-action-0.6.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"sqoop-action-0.2.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
Expand All @@ -2093,8 +2095,10 @@ void validateCommandV41(CommandLine commandLine) throws OozieCLIException {
"ssh-action-0.2.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"hive2-action-0.1.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader()
.getResourceAsStream("spark-action-0.1.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"hive2-action-0.2.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"spark-action-0.1.xsd")));
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
Validator validator = schema.newValidator();
Expand Down
72 changes: 72 additions & 0 deletions client/src/main/resources/hive-action-0.6.xsd
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:hive="uri:oozie:hive-action:0.6" elementFormDefault="qualified"
targetNamespace="uri:oozie:hive-action:0.6">

<xs:element name="hive" type="hive:ACTION"/>

<xs:complexType name="ACTION">
<xs:sequence>
<xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="prepare" type="hive:PREPARE" minOccurs="0" maxOccurs="1"/>
<xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="configuration" type="hive:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
<xs:choice minOccurs="1" maxOccurs="1">
<xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="query" type="xs:string" minOccurs="1" maxOccurs="1"/>
</xs:choice>
<xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>

<xs:complexType name="CONFIGURATION">
<xs:sequence>
<xs:element name="property" minOccurs="1" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>

<xs:complexType name="PREPARE">
<xs:sequence>
<xs:element name="delete" type="hive:DELETE" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="mkdir" type="hive:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>

<xs:complexType name="DELETE">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>

<xs:complexType name="MKDIR">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>

</xs:schema>
74 changes: 74 additions & 0 deletions client/src/main/resources/hive2-action-0.2.xsd
@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:hive2="uri:oozie:hive2-action:0.2" elementFormDefault="qualified"
targetNamespace="uri:oozie:hive2-action:0.2">

<xs:element name="hive2" type="hive2:ACTION"/>

<xs:complexType name="ACTION">
<xs:sequence>
<xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="prepare" type="hive2:PREPARE" minOccurs="0" maxOccurs="1"/>
<xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="configuration" type="hive2:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
<xs:element name="jdbc-url" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="password" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:choice minOccurs="1" maxOccurs="1">
<xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="query" type="xs:string" minOccurs="1" maxOccurs="1"/>
</xs:choice>
<xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>

<xs:complexType name="CONFIGURATION">
<xs:sequence>
<xs:element name="property" minOccurs="1" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>

<xs:complexType name="PREPARE">
<xs:sequence>
<xs:element name="delete" type="hive2:DELETE" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="mkdir" type="hive2:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>

<xs:complexType name="DELETE">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>

<xs:complexType name="MKDIR">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>

</xs:schema>
Expand Up @@ -21,11 +21,9 @@
import static org.apache.oozie.action.hadoop.LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS;

import java.io.IOException;
import java.io.StringReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -44,11 +42,15 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor {
static final String HIVE2_JDBC_URL = "oozie.hive2.jdbc.url";
static final String HIVE2_PASSWORD = "oozie.hive2.password";
static final String HIVE2_SCRIPT = "oozie.hive2.script";
static final String HIVE2_QUERY = "oozie.hive2.query";
static final String HIVE2_PARAMS = "oozie.hive2.params";
static final String HIVE2_ARGS = "oozie.hive2.args";

private boolean addScriptToCache;

public Hive2ActionExecutor() {
super("hive2");
this.addScriptToCache = false;
}

@Override
Expand All @@ -63,6 +65,11 @@ public List<Class> getLauncherClasses() {
return classes;
}

@Override
protected boolean shouldAddScriptToCache(){
return this.addScriptToCache;
}

@Override
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
return launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS, HIVE2_MAIN_CLASS_NAME);
Expand All @@ -76,26 +83,38 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
Namespace ns = actionXml.getNamespace();

String jdbcUrl = actionXml.getChild("jdbc-url", ns).getTextTrim();
conf.set(HIVE2_JDBC_URL, jdbcUrl);

String password = null;
Element passwordElement = actionXml.getChild("password", ns);
if (passwordElement != null) {
password = actionXml.getChild("password", ns).getTextTrim();
conf.set(HIVE2_PASSWORD, password);
}

String script = actionXml.getChild("script", ns).getTextTrim();
String scriptName = new Path(script).getName();
String beelineScriptContent = context.getProtoActionConf().get(HIVE2_SCRIPT);

if (beelineScriptContent == null){
addToCache(conf, appPath, script + "#" + scriptName, false);
Element queryElement = actionXml.getChild("query", ns);
Element scriptElement = actionXml.getChild("script", ns);
if(scriptElement != null) {
String script = scriptElement.getTextTrim();
String scriptName = new Path(script).getName();
this.addScriptToCache = true;
conf.set(HIVE2_SCRIPT, scriptName);
} else if(queryElement != null) {
// Unable to use getTextTrim due to https://issues.apache.org/jira/browse/HIVE-8182
String query = queryElement.getText();
conf.set(HIVE2_QUERY, query);
} else {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "INVALID_ARGUMENTS",
"Hive 2 action requires one of <script> or <query> to be set. Neither were found.");
}

List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
String[] strParams = new String[params.size()];
for (int i = 0; i < params.size(); i++) {
strParams[i] = params.get(i).getTextTrim();
}
MapReduceMain.setStrings(conf, HIVE2_PARAMS, strParams);

String[] strArgs = null;
List<Element> eArgs = actionXml.getChildren("argument", ns);
if (eArgs != null && eArgs.size() > 0) {
Expand All @@ -104,22 +123,11 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
strArgs[i] = eArgs.get(i).getTextTrim();
}
}
MapReduceMain.setStrings(conf, HIVE2_ARGS, strArgs);

setHive2Props(conf, jdbcUrl, password, scriptName, strParams, strArgs);
return conf;
}

public static void setHive2Props(Configuration conf, String jdbcUrl, String password, String script, String[] params,
String[] args) {
conf.set(HIVE2_JDBC_URL, jdbcUrl);
if (password != null) {
conf.set(HIVE2_PASSWORD, password);
}
conf.set(HIVE2_SCRIPT, script);
MapReduceMain.setStrings(conf, HIVE2_PARAMS, params);
MapReduceMain.setStrings(conf, HIVE2_ARGS, args);
}

@Override
protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
return true;
Expand Down
Expand Up @@ -21,19 +21,16 @@
import static org.apache.oozie.action.hadoop.LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS;

import java.io.IOException;
import java.io.StringReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.ActionExecutor.Context;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.service.ConfigurationService;
Expand All @@ -46,12 +43,16 @@
public class HiveActionExecutor extends ScriptLanguageActionExecutor {

private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";
static final String HIVE_QUERY = "oozie.hive.query";
static final String HIVE_SCRIPT = "oozie.hive.script";
static final String HIVE_PARAMS = "oozie.hive.params";
static final String HIVE_ARGS = "oozie.hive.args";

private boolean addScriptToCache;

public HiveActionExecutor() {
super("hive");
this.addScriptToCache = false;
}

@Override
Expand All @@ -66,6 +67,11 @@ public List<Class> getLauncherClasses() {
return classes;
}

@Override
protected boolean shouldAddScriptToCache() {
return this.addScriptToCache;
}

@Override
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
return launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS, HIVE_MAIN_CLASS_NAME);
Expand All @@ -78,19 +84,29 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
Configuration conf = super.setupActionConf(actionConf, context, actionXml, appPath);

Namespace ns = actionXml.getNamespace();
String script = actionXml.getChild("script", ns).getTextTrim();
String scriptName = new Path(script).getName();
String hiveScriptContent = context.getProtoActionConf().get(XOozieClient.HIVE_SCRIPT);

if (hiveScriptContent == null){
addToCache(conf, appPath, script + "#" + scriptName, false);
Element scriptElement = actionXml.getChild("script", ns);
Element queryElement = actionXml.getChild("query", ns);
if (scriptElement != null){
String script = scriptElement.getTextTrim();
String scriptName = new Path(script).getName();
this.addScriptToCache = true;
conf.set(HIVE_SCRIPT, scriptName);
} else if (queryElement != null) {
// Unable to use getTextTrim due to https://issues.apache.org/jira/browse/HIVE-8182
String query = queryElement.getText();
conf.set(HIVE_QUERY, query);
} else {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "INVALID_ARGUMENTS",
"Hive action requires one of <script> or <query> to be set. Neither were found.");
}

List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
String[] strParams = new String[params.size()];
for (int i = 0; i < params.size(); i++) {
strParams[i] = params.get(i).getTextTrim();
}
MapReduceMain.setStrings(conf, HIVE_PARAMS, strParams);

String[] strArgs = null;
List<Element> eArgs = actionXml.getChildren("argument", ns);
if (eArgs != null && eArgs.size() > 0) {
Expand All @@ -99,17 +115,10 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
strArgs[i] = eArgs.get(i).getTextTrim();
}
}

setHiveScript(conf, scriptName, strParams, strArgs);
MapReduceMain.setStrings(conf, HIVE_ARGS, strArgs);
return conf;
}

public static void setHiveScript(Configuration conf, String script, String[] params, String[] args) {
conf.set(HIVE_SCRIPT, script);
MapReduceMain.setStrings(conf, HIVE_PARAMS, params);
MapReduceMain.setStrings(conf, HIVE_ARGS, args);
}

@Override
protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
return true;
Expand Down

0 comments on commit b074b2c

Please sign in to comment.