Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
CHUKWA-194. Backfilling tools. Contributed by Jerome Boulon.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hadoop/chukwa/trunk@783878 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Ariel Shemaiah Rabkin committed Jun 11, 2009
1 parent c414bd0 commit ef9b357
Show file tree
Hide file tree
Showing 21 changed files with 857 additions and 75 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -4,6 +4,8 @@ Trunk (unreleased changes)

NEW FEATURES

CHUKWA-194. Backfilling tools. (Jerome Boulon via asrabkin)

CHUKWA-253. Added aggregations by user. (Cheng Zhang via Eric Yang)

CHUKWA-95. Added Web Service API to export data from database. (Terence Kwan via Eric Yang)
Expand Down
2 changes: 1 addition & 1 deletion bin/chukwa-config.sh
Expand Up @@ -96,7 +96,7 @@ if [ -f "${CHUKWA_CONF_DIR}/chukwa-env.sh" ]; then
fi

export DATACONFIG=${CHUKWA_CONF_DIR}/mdl.xml
COMMON=`ls ${CHUKWA_HOME}/lib/*.jar`
COMMON=`ls ${CHUKWA_HOME}/lib/*.jar ${CHUKWA_HOME}/hadoopjars/commons*.jar ${CHUKWA_HOME}/build/ivy/lib/chukwa/common/*.jar`
export COMMON=`echo ${COMMON} | sed 'y/ /:/'`
export CHUKWA_CORE=${CHUKWA_HOME}/chukwa-core-${CHUKWA_VERSION}.jar
export CHUKWA_AGENT=${CHUKWA_HOME}/chukwa-agent-${CHUKWA_VERSION}.jar
Expand Down
Binary file modified contrib/chukwa-pig/chukwa-pig.jar
Binary file not shown.
27 changes: 17 additions & 10 deletions src/java/org/apache/hadoop/chukwa/ChunkImpl.java
Expand Up @@ -44,13 +44,27 @@ public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
private transient Adaptor initiator;
long seqID;

ChunkImpl() {
private static String localHostAddr;
static {
try {
setHostAddress(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
setHostAddress("localhost");
}
}


public static void setHostAddress(String host) {
ChunkImpl.localHostAddr = host;
}


public static ChunkImpl getBlankChunk() {
return new ChunkImpl();
}

ChunkImpl() {
}

public ChunkImpl(String dataType, String streamName, long seq, byte[] data,
Adaptor source) {
this.seqID = seq;
Expand Down Expand Up @@ -237,14 +251,7 @@ public String toString() {
return source + ":" + application + ":" + new String(data) + "/" + seqID;
}

private static String localHostAddr;
static {
try {
localHostAddr = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
localHostAddr = "localhost";
}
}


/**
* @see org.apache.hadoop.chukwa.Chunk#getSerializedSizeEstimate()
Expand Down
Expand Up @@ -72,14 +72,25 @@ public void start(long adaptorID, String type, String status, long offset,
* Signals this adaptor to come to an orderly stop. The adaptor ought to push
* out all the data it can before exiting.
*
* This method is synchronous: In other words, after shutdown() returns, no
* new data should be written.
* This method is synchronous up to 60 seconds
*
* @return the logical offset at which the adaptor stops
* @throws AdaptorException
*/
@Deprecated
public long shutdown() throws AdaptorException;


/**
* Signals this adaptor to come to an orderly stop. The adaptor ought to push
* out all the data it can before exiting depending of the shutdown policy
*
* @return the logical offset at which the adaptor was when the method return
* @throws AdaptorException
*/
public long shutdown(AdaptorShutdownPolicy shutdownPolicy) throws AdaptorException;


/**
* Signals this adaptor to come to an abrupt stop, as quickly as it can. The
* use case here is "Whups, I didn't mean to start that adaptor tailing a
Expand All @@ -94,6 +105,7 @@ public void start(long adaptorID, String type, String status, long offset,
*
* @throws AdaptorException
*/
@Deprecated
public void hardStop() throws AdaptorException;

}
@@ -0,0 +1,5 @@
package org.apache.hadoop.chukwa.datacollection.adaptor;

public enum AdaptorShutdownPolicy {
HARD_STOP,GRACEFULLY,WAIT_TILL_FINISHED;
}
Expand Up @@ -39,7 +39,7 @@
* 0
*
*/
public class ExecAdaptor extends AbstractAdaptor {
public class ExecAdaptor extends AbstractAdaptor {

static class EmbeddedExec extends ExecPlugin {

Expand Down Expand Up @@ -129,19 +129,48 @@ public String getStreamName() {
}

@Override
@Deprecated
/**
* use shutdown(AdaptorShutdownPolicy shutdownPolicy)
*/
public void hardStop() throws AdaptorException {
exec.stop();
timer.cancel();
shutdown(AdaptorShutdownPolicy.HARD_STOP);
}

@Override
public long shutdown() throws AdaptorException {
try {
timer.cancel();
exec.waitFor(); // wait for last data to get pushed out
} catch (InterruptedException e) {
return sendOffset;

@Override
@Deprecated
/**
* use shutdown(AdaptorShutdownPolicy shutdownPolicy)
*/
public long shutdown() throws AdaptorException {
return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
}

@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
throws AdaptorException {
log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
switch(shutdownPolicy) {
case HARD_STOP :
timer.cancel();
exec.stop();
break;
case GRACEFULLY :
try {
timer.cancel();
exec.waitFor();
} catch (InterruptedException e) {
}
break;
case WAIT_TILL_FINISHED :
try {
timer.cancel();
exec.waitFor();
} catch (InterruptedException e) {
}
break;
}
log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
return sendOffset;
}

Expand Down
Expand Up @@ -135,7 +135,7 @@ public class FileAdaptor extends AbstractAdaptor {
private long startTime = 0;
private long timeOut = 0;


protected volatile boolean finished = false;
protected File toWatch;
protected RandomAccessFile reader = null;
protected long fileReadOffset;
Expand Down Expand Up @@ -186,12 +186,12 @@ void sendFile() {
long fileTime = toWatch.lastModified();
int bytesUsed = extractRecords(dest, 0, buf, fileTime);
this.fileReadOffset = bytesUsed;
finished = true;
deregisterAndStop(false);
cleanUp();
}catch(Exception e) {
} catch(Exception e) {
log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e);
}
finally {
} finally {
if (reader != null) {
try {
reader.close();
Expand All @@ -204,6 +204,7 @@ void sendFile() {
}
} else {
if (now > timeOut) {
finished = true;
log.warn("Couldn't read this file: " + toWatch.getAbsolutePath());
deregisterAndStop(false);
cleanUp() ;
Expand All @@ -229,18 +230,59 @@ private void cleanUp() {
*
* @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
*/
@Deprecated
public long shutdown() throws AdaptorException {
// do nothing -- will be automatically done by TimeOut
return fileReadOffset + offsetOfFirstByte;
return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
}

/**
* Stop tailing the file, effective immediately.
*/
@Deprecated
public void hardStop() throws AdaptorException {
cleanUp();
shutdown(AdaptorShutdownPolicy.HARD_STOP);
}

@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
switch(shutdownPolicy) {
case HARD_STOP :
cleanUp();
break;
case GRACEFULLY : {
int retry = 0;
while (!finished && retry < 60) {
try {
log.info("GRACEFULLY Retry:" + retry);
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}
break;
case WAIT_TILL_FINISHED : {
int retry = 0;
while (!finished) {
try {
if (retry%100 == 0) {
log.info("WAIT_TILL_FINISHED Retry:" + retry);
}

Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}

break;
}
log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
return fileReadOffset + offsetOfFirstByte;
}

public String getStreamName() {
return toWatch.getPath();
}
Expand Down
Expand Up @@ -98,44 +98,76 @@ public void start(String params, long bytes) {
*
* @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
*/
@Deprecated
public long shutdown() throws AdaptorException {
try {
if (toWatch.exists()) {
int retry = 0;
tailer.stopWatchingFile(this);
TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
lastTail.setDaemon(true);
lastTail.start();
while (lastTail.isAlive() && retry < 60) {
try {
log.info("Retry:" + retry);
Thread.currentThread().sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}
} finally {
return fileReadOffset + offsetOfFirstByte;
}

return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
}

/**
* Stop tailing the file, effective immediately.
*/
@Deprecated
public void hardStop() throws AdaptorException {
tailer.stopWatchingFile(this);
try {
if (reader != null) {
reader.close();
shutdown(AdaptorShutdownPolicy.HARD_STOP);
}


@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {

log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);

switch(shutdownPolicy) {
case HARD_STOP :
tailer.stopWatchingFile(this);
try {
if (reader != null) {
reader.close();
}
reader = null;
} catch(Throwable e) {
log.warn("Exception while closing reader:",e);
}
break;
case GRACEFULLY :
case WAIT_TILL_FINISHED :{
if (toWatch.exists()) {
int retry = 0;
tailer.stopWatchingFile(this);
TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
lastTail.setDaemon(true);
lastTail.start();

if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
while (lastTail.isAlive() && retry < 60) {
try {
log.info("GRACEFULLY Retry:" + retry);
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
} else {
while (lastTail.isAlive()) {
try {
if (retry%100 == 0) {
log.info("WAIT_TILL_FINISHED Retry:" + retry);
}
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}
}
}
reader = null;
} catch(Throwable e) {
// do nothing
break;
}
log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
return fileReadOffset + offsetOfFirstByte;
}



/**
* @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
*/
Expand Down Expand Up @@ -264,9 +296,11 @@ public synchronized boolean tailFile(ChunkReceiver eq)
+ MAX_READ_SIZE);
} else {
log.info("Conf is null, running in default mode");
conf = new Configuration();
}
} else {
log.info("Agent is null, running in default mode");
conf = new Configuration();
}
}

Expand Down

0 comments on commit ef9b357

Please sign in to comment.