Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
CHUKWA-715. Added Oozie Adaptor for collecting Oozie metrics. (Sreepa…
…thi Prasanna via Eric Yang)

git-svn-id: https://svn.apache.org/repos/asf/chukwa/trunk@1612617 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
macroadster committed Jul 22, 2014
1 parent 3ae226a commit 4e7e7e01d1898b93cfc2dfaa50640db1bcdf2dd8
Show file tree
Hide file tree
Showing 6 changed files with 419 additions and 39 deletions.
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased

NEW FEATURES

CHUKWA-715. Added Oozie Adaptor for collecting Oozie metrics. (Sreepathi Prasanna via Eric Yang)

CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service. (Sreepathi Prasanna via Eric Yang)

CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent. (Eric Yang)
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.chukwa.datacollection.adaptor;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Calendar;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.util.ChukwaUtil;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.util.RestUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

public class OozieAdaptor extends AbstractAdaptor {

private static Logger log = Logger.getLogger(OozieAdaptor.class);
private String uri;

private long sendOffset;
private Configuration chukwaConfiguration = null;
private static UserGroupInformation UGI = null;
private boolean isKerberosEnabled = false;
private int length = 0;
private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);
private static final long initialDelay = 60; // seconds
private static long periodicity = 60; // seconds
private ScheduledFuture<?> scheduledCollectorThread;

@Override
public String parseArgs(String s) {
String[] tokens = s.split(" ");
if (tokens.length == 2) {
uri = tokens[0];
try {
periodicity = Integer.parseInt(tokens[1]);
} catch (NumberFormatException e) {
log.warn("OozieAdaptor: incorrect argument for period. Expecting number");
return null;
}
} else {
log.warn("bad syntax in OozieAdaptor args");
return null;
}
return s;
}

@Override
public void start(long offset) throws AdaptorException {
sendOffset = offset;
init(); // initialize the configuration
log.info("Starting Oozie Adaptor with [ " + sendOffset + " ] offset");
scheduledCollectorThread = scheduler.scheduleAtFixedRate(
new OozieMetricsCollector(), initialDelay, periodicity,
TimeUnit.SECONDS);
log.info("scheduled");
}

@Override
public String getCurrentStatus() {
StringBuilder buffer = new StringBuilder();
buffer.append(type);
buffer.append(" ");
buffer.append(uri);
buffer.append(" ");
buffer.append(periodicity);
return buffer.toString();
}

@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
throws AdaptorException {
scheduledCollectorThread.cancel(true);
scheduler.shutdown();
return sendOffset;
}

private class OozieMetricsCollector implements Runnable {
@Override
public void run() {
try {
if (isKerberosEnabled) {
if (UGI == null) {
throw new IllegalStateException("UGI Login context is null");
}

UGI.checkTGTAndReloginFromKeytab();
length = UGI.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws Exception {
return processMetrics();
}
});

} else {
length = processMetrics();
}

if (length <= 0) {
log.warn("Oozie is either not responding or sending zero payload");
} else {
log.info("Processing a oozie instrumentation payload of [" + length
+ "] bytes");
}
} catch (Exception e) {
log.error(ExceptionUtil.getStackTrace(e));
log.error("Exception occured while getting oozie metrics " + e);
}
}
}

private void init() {
if (getChukwaConfiguration() == null) {
setChukwaConfiguration(ChukwaUtil.readConfiguration());
}
String authType = getChukwaConfiguration().get(
"chukwaAgent.hadoop.authentication.type");
if (authType != null && authType.equalsIgnoreCase("kerberos")) {
login(); // get the UGI context
isKerberosEnabled = true;
}
}

private void login() {
try {
String principalConfig = getChukwaConfiguration().get(
"chukwaAgent.hadoop.authentication.kerberos.principal",
System.getProperty("user.name"));
String hostname = null;
String principalName = SecurityUtil.getServerPrincipal(principalConfig,
hostname);
UGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
principalName,
getChukwaConfiguration().get(
"chukwaAgent.hadoop.authentication.kerberos.keytab"));
} catch (IOException e) {
log.error(ExceptionUtil.getStackTrace(e));
}
}

private int processMetrics() {
return addChunkToReceiver(getOozieMetrics().getBytes());
}

private String getOozieMetrics() {
return RestUtil.getResponseAsString(uri);
}

public int addChunkToReceiver(byte[] data) {
try {
sendOffset += data.length;
ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, this);
long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
.getTimeInMillis();
c.addTag("timeStamp=\"" + rightNow + "\"");
dest.add(c);
} catch (Exception e) {
log.error(ExceptionUtil.getStackTrace(e));
}
return data.length;
}

public Configuration getChukwaConfiguration() {
return chukwaConfiguration;
}

public void setChukwaConfiguration(Configuration chukwaConfiguration) {
this.chukwaConfiguration = chukwaConfiguration;
}
}
@@ -49,6 +49,7 @@
import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
import org.apache.hadoop.chukwa.util.ChukwaUtil;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
@@ -276,7 +277,7 @@ public static void main(String[] args) throws AdaptorException {
System.exit(0);
}

conf = readConfig();
conf = ChukwaUtil.readConfiguration();
agent = new ChukwaAgent(conf);

if (agent.anotherAgentIsRunning()) {
@@ -736,44 +737,6 @@ public Connector getConnector() {
return connector;
}

private static Configuration readConfig() {
Configuration conf = new Configuration();

String chukwaHomeName = System.getenv("CHUKWA_HOME");
if (chukwaHomeName == null) {
chukwaHomeName = "";
}
File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();

log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");

String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
File chukwaConf;
if (chukwaConfName != null)
chukwaConf = new File(chukwaConfName).getAbsoluteFile();
else
chukwaConf = new File(chukwaHome, "conf");

log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
conf.addResource(new Path(agentConf.getAbsolutePath()));
conf.addResource(new Path( new File(chukwaConf, "chukwa-common.xml").getAbsolutePath()));
if (conf.get("chukwaAgent.checkpoint.dir") == null)
conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
.getAbsolutePath());
conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
"initial_adaptors").getAbsolutePath());
try {
Configuration chukwaAgentConf = new Configuration(false);
chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
Checker.checkConf(new OptDictionary(new File(new File(chukwaHome, "share/chukwa/lib"), "agent.dict")),
HSlurper.fromHConf(chukwaAgentConf));
} catch(Exception e) {
e.printStackTrace();
}
return conf;
}

public void shutdown() {
shutdown(false);
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.chukwa.util;

import java.io.File;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

import edu.berkeley.confspell.Checker;
import edu.berkeley.confspell.HSlurper;
import edu.berkeley.confspell.OptDictionary;

/*
* Create a common set of utility classes for code reuse
*/

public class ChukwaUtil {

private static Logger log = Logger.getLogger(ChukwaUtil.class);

public static Configuration readConfiguration() {
Configuration conf = new Configuration();

String chukwaHomeName = System.getenv("CHUKWA_HOME");
if (chukwaHomeName == null) {
chukwaHomeName = "";
}
File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();

log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");

String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
File chukwaConf;
if (chukwaConfName != null)
chukwaConf = new File(chukwaConfName).getAbsoluteFile();
else
chukwaConf = new File(chukwaHome, "conf");

log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
conf.addResource(new Path(agentConf.getAbsolutePath()));
if (conf.get("chukwaAgent.checkpoint.dir") == null)
conf.set("chukwaAgent.checkpoint.dir",
new File(chukwaHome, "var").getAbsolutePath());
conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
"initial_adaptors").getAbsolutePath());
try {
Configuration chukwaAgentConf = new Configuration(false);
chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
Checker.checkConf(new OptDictionary(new File(new File(chukwaHome,
"share/chukwa/lib"), "agent.dict")), HSlurper
.fromHConf(chukwaAgentConf));
} catch (Exception e) {
e.printStackTrace();
}
return conf;
}

}

0 comments on commit 4e7e7e0

Please sign in to comment.