Permalink
Browse files

Fixed JUM-96: Add rumen processing logic for processing on MapR hadoo…

…p distribution
  • Loading branch information...
ShubhamChaurasia committed May 5, 2015
1 parent 1f81739 commit e172dedefb2019e1d41f25540fb8be95173985e5
@@ -3,9 +3,11 @@
import static org.jumbune.common.utils.Constants.COLON;
import static org.jumbune.common.utils.Constants.SPACE;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -27,6 +29,7 @@
import org.jumbune.remoting.common.ApiInvokeHintsEnum;
import org.jumbune.remoting.common.CommandType;
import org.jumbune.remoting.common.RemotingConstants;
import org.jumbune.utils.exception.JumbuneException;
import com.google.gson.Gson;
@@ -72,6 +75,9 @@
/** The Constant MKDIR_CMD. */
private static final String MKDIR_CMD = "mkdir -p ";
/** The Constant RM_CMD. Command to remove Files and Directories recursively. */
private static final String RM_CMD="rm -r ";
/** The Constant JAVA_CP_CMD. */
private static final String JAVA_CP_CMD = "java -cp ";
@@ -95,12 +101,18 @@
private static String HISTORY_DIR_SUFFIX_YARN = "/tmp/hadoop-yarn/staging/history/done/*/*/*/*/";
private static final String HISTORY_DIR_SUFFIX_MAPR_NY = "/var/mapr/cluster/mapred/jobTracker/history/done/";
private static String USER_INT_HISTORY_DIR_SUFFIX = "/history/done_intermediate/*/";
private static String USER_HISTORY_DIR_SUFFIX = "/history/done/*/*/*/*/";
private static final String HDFS_FILE_GET_COMMAND = "/bin/hadoop fs -get";
private static final String HDFS_LS_COMMAND = "/bin/hadoop fs -ls";
private static final String HDFS_RM_COMMAND="/bin/hadoop fs -rmr";
/** The Constant wildcard**/
private static final String WILDCARD="/*";
@@ -132,63 +144,113 @@ public JobOutput getJobDetails(Config config, String jobID) throws IOException{
String logfilePath = null ;
String relLocalPath = null;
if(!isYarn){
logfilePath = getLogFilePath(jobID, remoter, logsHistory,
builder);
relLocalPath = Constants.JOB_JARS_LOC + jobConfig.getFormattedJumbuneJobName();
String relRemotePath = relLocalPath + RUMEN;
StringBuilder stringAppender = new StringBuilder(agentHome);
stringAppender.append(File.separator).append(relRemotePath).append(File.separator);
// make rumen related directory and files
String pathToRumenDir = stringAppender.toString();
String jsonFilepath = pathToRumenDir + JSON_FILE;
String topologyFilePath = pathToRumenDir + TOPOLOGY_FILE;
builder.getCommandBatch().clear();
builder.addCommand(MKDIR_CMD + pathToRumenDir, false, null, CommandType.FS);
remoter.fireAndForgetCommand(builder.getCommandWritable());
// preparing command for rumen processing
String remoteHadoopLib = HADOOP_HOME + LIB;
Properties props = loadHadoopJarConfigurationProperties();
String coreJar;
if(SupportedHadoopDistributions.HADOOP_NON_YARN.equals(hadoopVersion)) {
coreJar = HADOOP_HOME + props.getProperty("CORE_JAR");
}else {
coreJar = HADOOP_HOME + WILDCARD;
}
String commonsLoggingJar = agentHome + LIB + props.getProperty("COMMONS_LOGGING_JAR");
String commonsCliJar = remoteHadoopLib + props.getProperty("COMMONS_CLI_JAR");
String commonsConfigurationJar = agentHome + LIB + props.getProperty("COMMONS_CONFIGURATION_JAR");
String commonsLangJar = agentHome + LIB + props.getProperty("COMMONS_LANG_JAR");
String jacksonMapperAslJar = agentHome + LIB + props.getProperty("JACKSON_MAPPER_ASL_JAR");
String jacksonMapperCoreJar = agentHome + LIB + props.getProperty("JACKSON_MAPPER_CORE_JAR");
String rumenJar = agentHome + LIB + props.getProperty("RUMEN_JAR")+"-"+Versioning.BUILD_VERSION+Versioning.DISTRIBUTION_NAME+".jar";
StringBuilder sb = new StringBuilder(JAVA_CP_CMD);
checkHadoopVersionsForRumen(hadoopVersion, logfilePath, jsonFilepath,
topologyFilePath, coreJar, commonsLoggingJar,
commonsCliJar, commonsConfigurationJar, commonsLangJar,
jacksonMapperAslJar, jacksonMapperCoreJar, rumenJar, sb);
LOGGER.debug("Rumen processing command [" + sb.toString()+"]");
startRumenProcessing(remoter, relLocalPath, relRemotePath, sb);
remoter = RemotingUtil.getRemoter(jobConfig, appHome);
remoter.receiveLogFiles(relLocalPath, relRemotePath);
LOGGER.debug("Received log files from:"+ relRemotePath);
// process json
Gson gson = new Gson();
JobDetails jobDetails = extractJobDetails(appHome, relLocalPath, gson);
return convertToFinalOutput(jobDetails);
//check if the hadoop distribution is non-yarn and MapR as well.
if(SupportedHadoopDistributions.HADOOP_MAPR.equals(hadoopVersion))
{
relLocalPath = Constants.JOB_JARS_LOC + jobConfig.getFormattedJumbuneJobName();
String fileName=null;
//getting history file name corresponding to jobID
fileName=getHistoryFileNameForMapR(remoter, logsHistory, jobID);
//Now starting rumen processing
Properties hadoopJarProperties = loadHadoopJarConfigurationProperties();
String rumenDirPath = agentHome+Constants.JOB_JARS_LOC + jobConfig.getFormattedJumbuneJobName()+ RUMEN;
// make rumen related directory and files
final String rumenTempDirOnMapRFS = "/jumbune/rumen-tmp/";
final String jsonFilePath = rumenTempDirOnMapRFS + JSON_FILE;
final String topologyFilePath = rumenTempDirOnMapRFS + TOPOLOGY_FILE;
//removing any previous rumen Directory and its contents
builder.addCommand(RM_CMD+ rumenDirPath, false, null, CommandType.FS);
remoter.fireAndForgetCommand(builder.getCommandWritable());
builder.getCommandBatch().clear();
mkDir(builder, remoter, rumenDirPath);
String historyFilePathOnMapRFS = logsHistory + File.separator
+ fileName;
//prepare rumen processing command and start processing
StringBuilder rumenProcessingCommand = prepareRumenProcessingCommand(hadoopJarProperties,
agentHome, jsonFilePath, topologyFilePath,
historyFilePathOnMapRFS);
startRumenProcessing(remoter, relLocalPath, rumenDirPath, rumenProcessingCommand);
getAndRemoveRumenTempDir(remoter, jobConfig,
rumenTempDirOnMapRFS, rumenDirPath);
String relativeRemotePath = Constants.JOB_JARS_LOC
+ jobConfig.getFormattedJumbuneJobName()
+ RUMEN;
//receiving job-trace.json and topology files
remoter.receiveLogFiles(relLocalPath, relativeRemotePath);
LOGGER.debug("Received log files from:" + relativeRemotePath);
Gson gson = new Gson();
JobDetails jobDetails = extractJobDetails(appHome,
relLocalPath, gson);
return convertToFinalOutput(jobDetails);
}else{
logfilePath = getLogFilePath(jobID, remoter, logsHistory,
builder);
relLocalPath = Constants.JOB_JARS_LOC + jobConfig.getFormattedJumbuneJobName();
String relRemotePath = relLocalPath + RUMEN;
StringBuilder stringAppender = new StringBuilder(agentHome);
stringAppender.append(File.separator).append(relRemotePath).append(File.separator);
// make rumen related directory and files
String pathToRumenDir = stringAppender.toString();
String jsonFilepath = pathToRumenDir + JSON_FILE;
String topologyFilePath = pathToRumenDir + TOPOLOGY_FILE;
builder.getCommandBatch().clear();
mkDir(builder, remoter, pathToRumenDir);
// preparing command for rumen processing
String remoteHadoopLib = HADOOP_HOME + LIB;
Properties hadoopJarProperties = loadHadoopJarConfigurationProperties();
String coreJar;
if(SupportedHadoopDistributions.HADOOP_NON_YARN.equals(hadoopVersion)) {
coreJar = HADOOP_HOME + hadoopJarProperties.getProperty("CORE_JAR");
}else {
coreJar = HADOOP_HOME + WILDCARD;
}
String commonsLoggingJar = agentHome + LIB + hadoopJarProperties.getProperty("COMMONS_LOGGING_JAR");
String commonsCliJar = remoteHadoopLib + hadoopJarProperties.getProperty("COMMONS_CLI_JAR");
String commonsConfigurationJar = agentHome + LIB + hadoopJarProperties.getProperty("COMMONS_CONFIGURATION_JAR");
String commonsLangJar = agentHome + LIB + hadoopJarProperties.getProperty("COMMONS_LANG_JAR");
String jacksonMapperAslJar = agentHome + LIB + hadoopJarProperties.getProperty("JACKSON_MAPPER_ASL_JAR");
String jacksonMapperCoreJar = agentHome + LIB + hadoopJarProperties.getProperty("JACKSON_MAPPER_CORE_JAR");
String rumenJar = agentHome + LIB + hadoopJarProperties.getProperty("RUMEN_JAR")+"-"+Versioning.BUILD_VERSION+Versioning.DISTRIBUTION_NAME+".jar";
StringBuilder rumenProcessingCommand = new StringBuilder(JAVA_CP_CMD);
checkHadoopVersionsForRumen(hadoopVersion, logfilePath, jsonFilepath,
topologyFilePath, coreJar, commonsLoggingJar,
commonsCliJar, commonsConfigurationJar, commonsLangJar,
jacksonMapperAslJar, jacksonMapperCoreJar, rumenJar, rumenProcessingCommand);
LOGGER.debug("Rumen processing command [" + rumenProcessingCommand.toString()+"]");
startRumenProcessing(remoter, relLocalPath, relRemotePath, rumenProcessingCommand);
remoter = RemotingUtil.getRemoter(jobConfig, appHome);
remoter.receiveLogFiles(relLocalPath, relRemotePath);
LOGGER.debug("Received log files from:"+ relRemotePath);
// process json
Gson gson = new Gson();
JobDetails jobDetails = extractJobDetails(appHome, relLocalPath, gson);
return convertToFinalOutput(jobDetails);
}
}else{
String relativeRemotePath = Constants.JOB_JARS_LOC + jobConfig.getJumbuneJobName() + File.separator + jobID;
String remotePath = agentHome + relativeRemotePath;
builder.addCommand(MKDIR_CMD + remotePath, false, null, CommandType.FS);
builder.addCommand(CHMOD_CMD + remotePath, false, null, CommandType.FS);
remoter.fireAndForgetCommand(builder.getCommandWritable());
mkDir(builder, remoter, remotePath);
checkAndgetCurrentLogFilePathForYarn(remoter,logsHistory,remotePath,jobID,config);
relLocalPath = Constants.JOB_JARS_LOC + jobConfig.getFormattedJumbuneJobName();
remoter.receiveLogFiles(relLocalPath, relativeRemotePath);
@@ -208,6 +270,140 @@ public JobOutput getJobDetails(Config config, String jobID) throws IOException{
return null;
}
/**
* This method gets the Job History file name corresponding to
* provided jobID. It assumes the presence of history file on MapRFS.
*
* @param remoter
* @param logsHistory
* @param jobID
* @return
*/
private String getHistoryFileNameForMapR(Remoter remoter,
String logsHistory, String jobID) {
StringBuilder lsCommand = new StringBuilder()
.append(Constants.HADOOP_HOME).append(HDFS_LS_COMMAND)
.append(Constants.SPACE).append(logsHistory).append("*")
.append(jobID).append("*").append("[!'.xml']");
LOGGER.debug("History FileName Fetch Command: " + lsCommand.toString());
CommandWritableBuilder lsBuilder = new CommandWritableBuilder();
lsBuilder.addCommand(lsCommand.toString(), false, null,
CommandType.HADOOP_FS);
String response = (String) remoter
.fireCommandAndGetObjectResponse(lsBuilder.getCommandWritable());
BufferedReader reader = new BufferedReader(new StringReader(response));
String line = null;
String[] splits = null;
String filePath = null;
try {
while ((line = reader.readLine()) != null) {
if (line.contains(jobID)) {
splits = line.split("\\s+");
}
}
} catch (IOException exception) {
LOGGER.error("Error reading command response: "+exception);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioException) {
LOGGER.error("Error while closing the reader: "
+ ioException);
}
}
}
try {
filePath = splits[7];
} catch (ArrayIndexOutOfBoundsException boundsException) {
LOGGER.error("Error reading the file name from command response: "
+ boundsException);
}
return filePath.substring(filePath.lastIndexOf(File.separator) + 1);
}
/**
* This method creates a directory at agent side and gives the writing permission(chmod o+w) to others.
*
* @param builder
* @param remoter
* @param dirPath
*/
private void mkDir(CommandWritableBuilder builder, Remoter remoter,
String dirPath) {
builder.addCommand(MKDIR_CMD + dirPath, false, null, CommandType.FS);
builder.addCommand(CHMOD_CMD + dirPath, false, null, CommandType.FS);
remoter.fireAndForgetCommand(builder.getCommandWritable());
}
/**
* This method prepares command necessary for rumen processing of
* job history file. Being specific to MapR distribution, this method
* assumes job history file to be on MapRFS.
*
* @param props
* @param agentHome
* @param jsonFilepath
* @param topologyFilePath
* @param historyFilePathOnMapRFS
* @return
*/
private StringBuilder prepareRumenProcessingCommand(Properties hadoopJarProperties, String agentHome, String jsonFilepath, String topologyFilePath, String historyFilePathOnMapRFS)
{
String[] requiredJars={
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("CORE_MAPR_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("LOG4J_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("JACKSON_CORE_V1.5_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("JACKSON_MAPPER_V1.5_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("COMMONS_CLI_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("COMMONS_COLLECTIONS_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("COMMONS_LOGGING_V1.0.4_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("MAPRFS_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("ZOOKEEPER_JAR"),
HADOOP_HOME+LIB+hadoopJarProperties.getProperty("COMMONS_EL_JAR")
};
String rumenJar = agentHome + LIB + hadoopJarProperties.getProperty("RUMEN_JAR")+"-"+Versioning.BUILD_VERSION+Versioning.DISTRIBUTION_NAME+".jar";
StringBuilder rumenProcessingCommand = new StringBuilder(JAVA_CP_CMD);
//adding jars to classpath
for(String jar:requiredJars)
{
rumenProcessingCommand.append(jar).append(COLON);
}
rumenProcessingCommand.append(rumenJar);
rumenProcessingCommand.append(SPACE).append(RUMEN_MAIN_CLASS_OLD).append(SPACE).append(jsonFilepath)
.append(SPACE).append(topologyFilePath).append(SPACE).append(historyFilePathOnMapRFS);
return rumenProcessingCommand;
}
/**
* This method gets the contents (job-trace.json and topology file) of temporary directory generated
* on MapRFS during the rumen processing and eventually removes the temporary directory.
*
* @param remoter
* @param jobConfig
* @param rumenTempDirOnMapRFS
* @param rumenDirPath
*/
private void getAndRemoveRumenTempDir(Remoter remoter, JobConfig jobConfig, String rumenTempDirOnMapRFS, String rumenDirPath)
{
StringBuilder commandToExecute = new StringBuilder().append(Constants.HADOOP_HOME).append(HDFS_FILE_GET_COMMAND)
.append(Constants.SPACE).append(rumenTempDirOnMapRFS).append("*").append(SPACE).append(rumenDirPath);
LOGGER.debug("File get Command" + commandToExecute.toString());
CommandWritableBuilder fsGetBuilder = new CommandWritableBuilder();
fsGetBuilder.addCommand(commandToExecute.toString(),false, null, CommandType.MAPRED).populate(jobConfig, null);
StringBuilder rmCommand=new StringBuilder().append(Constants.HADOOP_HOME).append(HDFS_RM_COMMAND).append(SPACE).append("/jumbune");
fsGetBuilder.addCommand(rmCommand.toString(), false, null, CommandType.HADOOP_FS);
remoter.fireAndForgetCommand(fsGetBuilder.getCommandWritable());
}
private String checkAndGetHistFile(String remotePath) {
String fileName = null;
@@ -334,7 +530,9 @@ private String changeLogHistoryPathAccToHadoopVersion(String remoteHadoop,
logsHistory = remoteHadoop + LOGS + HISTORY_DIR_SUFFIX;
}else if(SupportedHadoopDistributions.HADOOP_YARN.equals(hadoopVersion) || SupportedHadoopDistributions.CDH_5.equals(hadoopVersion) || SupportedHadoopDistributions.APACHE_02X.equals(hadoopVersion)){
logsHistory = HISTORY_INT_DIR_SUFFIX_YARN;
}
}else if(SupportedHadoopDistributions.HADOOP_MAPR.equals(hadoopVersion)) {
logsHistory =HISTORY_DIR_SUFFIX_MAPR_NY;
}
return logsHistory;
}
@@ -452,7 +650,8 @@ private PhaseDetails addPhaseDetails(List<TaskDetails> tasks, long startTime) {
long endPoint = (td.getFinishTime() - startTime) / CONVERSION_FACTOR_BYTES_TO_KB;
tod.setEndPoint(endPoint);
tod.setTimeTaken(endPoint - startPoint);
long dataFlowRate = td.getInputBytes() / (endPoint - startPoint);
long diff=endPoint - startPoint;
long dataFlowRate = td.getInputBytes() / ((diff==0)?1:diff);
tod.setDataFlowRate(dataFlowRate);
tod.setOutputBytes(td.getOutputBytes());
tod.setOutputRecords(td.getOutputRecords());
@@ -504,7 +703,8 @@ private long calculateAvgDataFlow(List<TaskOutputDetails> taskOutputDetails) {
for (TaskOutputDetails tod : taskOutputDetails) {
totalDataFlow += tod.getDataFlowRate();
}
return (totalDataFlow / taskOutputDetails.size()) / CONVERSION_FACTOR_BYTES_TO_KB;
long size=taskOutputDetails.size();
return (totalDataFlow / (size==0?1:size)) / CONVERSION_FACTOR_BYTES_TO_KB;
}
Oops, something went wrong.

0 comments on commit e172ded

Please sign in to comment.