From ef9b357d89044bc7dab6a73fdf20349adc067aa7 Mon Sep 17 00:00:00 2001 From: Ariel Shemaiah Rabkin Date: Thu, 11 Jun 2009 19:16:38 +0000 Subject: [PATCH] CHUKWA-194. Backfilling tools. Contributed by Jerome Boulon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/chukwa/trunk@783878 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + bin/chukwa-config.sh | 2 +- contrib/chukwa-pig/chukwa-pig.jar | Bin 19491 -> 19530 bytes .../org/apache/hadoop/chukwa/ChunkImpl.java | 27 +- .../datacollection/adaptor/Adaptor.java | 16 +- .../adaptor/AdaptorShutdownPolicy.java | 5 + .../datacollection/adaptor/ExecAdaptor.java | 51 ++- .../datacollection/adaptor/FileAdaptor.java | 56 ++- .../filetailer/FileTailingAdaptor.java | 90 +++-- .../adaptor/filetailer/TerminatorThread.java | 6 +- .../datacollection/agent/AdaptorFactory.java | 2 +- .../datacollection/agent/AdaptorManager.java | 35 ++ .../agent/AgentControlSocketListener.java | 1 + .../datacollection/agent/ChukwaAgent.java | 3 +- .../chukwa/extraction/engine/RecordUtil.java | 13 + .../tools/backfilling/BackfillingLoader.java | 90 +++++ .../backfilling/QueueToWriterConnector.java | 147 ++++++++ .../hadoop/chukwa/util/ConstRateAdaptor.java | 24 +- .../hadoop/chukwa/util/MaxRateSender.java | 21 +- .../adaptor/ChukwaTestAdaptor.java | 7 + .../backfilling/TestBackfillingLoader.java | 334 ++++++++++++++++++ 21 files changed, 857 insertions(+), 75 deletions(-) create mode 100644 src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java create mode 100644 src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java create mode 100644 src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java create mode 100644 src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java diff --git a/CHANGES.txt b/CHANGES.txt index 859327932..3e15c69ad 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,8 @@ Trunk (unreleased changes) NEW FEATURES + CHUKWA-194. Backfilling tools. (Jerome Boulon via asrabkin) + CHUKWA-253. Added aggregations by user. (Cheng Zhang via Eric Yang) CHUKWA-95. Added Web Service API to export data from database. (Terence Kwan via Eric Yang) diff --git a/bin/chukwa-config.sh b/bin/chukwa-config.sh index 21ef01a74..ed68dd93d 100755 --- a/bin/chukwa-config.sh +++ b/bin/chukwa-config.sh @@ -96,7 +96,7 @@ if [ -f "${CHUKWA_CONF_DIR}/chukwa-env.sh" ]; then fi export DATACONFIG=${CHUKWA_CONF_DIR}/mdl.xml -COMMON=`ls ${CHUKWA_HOME}/lib/*.jar` +COMMON=`ls ${CHUKWA_HOME}/lib/*.jar ${CHUKWA_HOME}/hadoopjars/commons*.jar ${CHUKWA_HOME}/build/ivy/lib/chukwa/common/*.jar` export COMMON=`echo ${COMMON} | sed 'y/ /:/'` export CHUKWA_CORE=${CHUKWA_HOME}/chukwa-core-${CHUKWA_VERSION}.jar export CHUKWA_AGENT=${CHUKWA_HOME}/chukwa-agent-${CHUKWA_VERSION}.jar diff --git a/contrib/chukwa-pig/chukwa-pig.jar b/contrib/chukwa-pig/chukwa-pig.jar index ae95afb4fe1bb8ac1b82fc9e54eac0e7deefd91e..58b7dad30bc492265e9db98ceba9cd1e3b0be4fd 100644 GIT binary patch delta 849 zcmZ2HgYnc1M&1B#W)?065MW|IJ&{+Vo{{~umG%XH&1#@f7Z8g8v9D{0qpqi)o4&83 zpQoE^aEP9-oA0T;e1{Z793IYXDNnetWc=b>HXYo$yyJ5!p z7t0vS8PvB_ugU()6fpbvkyX5QM{S-v#cPIym48dqZWXadI2!2MiTmW4LDT~^5H*>T zQ5Q^GGMa$s%>|4Lg?NDst5aLeH@skGV7NHhz*1qeod`1vSl&eOFex&7p3L9G@pFk^;EHw&1dW+?$-INF|NhNyA8$jS^1!pTqFEWp;m!*H^e zyJ;ZE0lm& zc!*B^=gtEb;q{Pa3l6<+Mxa8E-VdH9OfwHkH?h8=ti47w;bq`>qjfP#E-qL(z2wieJB z&B@EX1i(VOfI^#0ASR3OO#TNI7W9^8agLLBy$+yP(|OTDOWP4*g%|&1P9G7rziuFB0sx|j@xA~6 delta 795 zcmX>#gK_Z;M&1B#W)?065V%@;Y9g;j{gu*FR*uHk_lE$5l7LtQh<#l{9CbbY-1L1N z{XE@VgG2Ou-9G!CIql=Et9OytTUYDcne&^246YbIcv^JES*TFQ!{?kwC_@*c>RHE2 z=Ui8Is;<=BL)184WMu{h$7CLN3tp7)m>lSC3NmkU zr@OTHMqtR~7p3b5cr!AIFhc?nB)8jL+WIn7F0mjnIU^Nai4{;54us$=0fx7Ze;6hU zdML<)Y=w&h8Io`gNRiIu01pkYJE}dT8IyrLnaKyd_$HrcWYtEvhJyiQ%Bij98(uIo zFkIwdV9ZhD|m51jP;UcvV{q=PtNg@Q~*YSm9n132RmSJ zF)A}KXrma*q6IeNfR`L79wt8o>hCrI3dv07^W>ha>Ma1)zai^a<7J?awqp25(h{t{ zz*`oqe}cC(lZh=@XrCwF timeOut) { + finished = true; log.warn("Couldn't read this file: " + toWatch.getAbsolutePath()); deregisterAndStop(false); cleanUp() ; @@ -229,18 +230,59 @@ private void cleanUp() { * * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown() */ + @Deprecated public long shutdown() throws AdaptorException { - // do nothing -- will be automatically done by TimeOut - return fileReadOffset + offsetOfFirstByte; + return shutdown(AdaptorShutdownPolicy.GRACEFULLY); } /** * Stop tailing the file, effective immediately. */ + @Deprecated public void hardStop() throws AdaptorException { - cleanUp(); + shutdown(AdaptorShutdownPolicy.HARD_STOP); } + @Override + public long shutdown(AdaptorShutdownPolicy shutdownPolicy) { + log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this); + switch(shutdownPolicy) { + case HARD_STOP : + cleanUp(); + break; + case GRACEFULLY : { + int retry = 0; + while (!finished && retry < 60) { + try { + log.info("GRACEFULLY Retry:" + retry); + Thread.sleep(1000); + retry++; + } catch (InterruptedException ex) { + } + } + } + break; + case WAIT_TILL_FINISHED : { + int retry = 0; + while (!finished) { + try { + if (retry%100 == 0) { + log.info("WAIT_TILL_FINISHED Retry:" + retry); + } + + Thread.sleep(1000); + retry++; + } catch (InterruptedException ex) { + } + } + } + + break; + } + log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this); + return fileReadOffset + offsetOfFirstByte; + } + public String getStreamName() { return toWatch.getPath(); } diff --git a/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java index 1100998a6..dafa2abeb 100644 --- a/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java +++ b/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java @@ -98,44 +98,76 @@ public void start(String params, long bytes) { * * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown() */ + @Deprecated public long shutdown() throws AdaptorException { - try { - if (toWatch.exists()) { - int retry = 0; - tailer.stopWatchingFile(this); - TerminatorThread lastTail = new TerminatorThread(this, tailer.eq); - lastTail.setDaemon(true); - lastTail.start(); - while (lastTail.isAlive() && retry < 60) { - try { - log.info("Retry:" + retry); - Thread.currentThread().sleep(1000); - retry++; - } catch (InterruptedException ex) { - } - } - } - } finally { - return fileReadOffset + offsetOfFirstByte; - } - + return shutdown(AdaptorShutdownPolicy.GRACEFULLY); } /** * Stop tailing the file, effective immediately. */ + @Deprecated public void hardStop() throws AdaptorException { - tailer.stopWatchingFile(this); - try { - if (reader != null) { - reader.close(); + shutdown(AdaptorShutdownPolicy.HARD_STOP); + } + + + @Override + public long shutdown(AdaptorShutdownPolicy shutdownPolicy) { + + log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this); + + switch(shutdownPolicy) { + case HARD_STOP : + tailer.stopWatchingFile(this); + try { + if (reader != null) { + reader.close(); + } + reader = null; + } catch(Throwable e) { + log.warn("Exception while closing reader:",e); + } + break; + case GRACEFULLY : + case WAIT_TILL_FINISHED :{ + if (toWatch.exists()) { + int retry = 0; + tailer.stopWatchingFile(this); + TerminatorThread lastTail = new TerminatorThread(this, tailer.eq); + lastTail.setDaemon(true); + lastTail.start(); + + if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) { + while (lastTail.isAlive() && retry < 60) { + try { + log.info("GRACEFULLY Retry:" + retry); + Thread.sleep(1000); + retry++; + } catch (InterruptedException ex) { + } + } + } else { + while (lastTail.isAlive()) { + try { + if (retry%100 == 0) { + log.info("WAIT_TILL_FINISHED Retry:" + retry); + } + Thread.sleep(1000); + retry++; + } catch (InterruptedException ex) { + } + } + } + } } - reader = null; - } catch(Throwable e) { - // do nothing + break; } + log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this); + return fileReadOffset + offsetOfFirstByte; } - + + /** * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus() */ @@ -264,9 +296,11 @@ public synchronized boolean tailFile(ChunkReceiver eq) + MAX_READ_SIZE); } else { log.info("Conf is null, running in default mode"); + conf = new Configuration(); } } else { log.info("Agent is null, running in default mode"); + conf = new Configuration(); } } diff --git a/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java b/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java index 38a7dbb1d..1ea6797c3 100644 --- a/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java +++ b/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java @@ -36,8 +36,7 @@ public void run() { endTime = System.currentTimeMillis() + (10 * 60 * 1000); // now + 10 // mins if (count > 3) { - log - .warn("TerminatorThread should have been finished by now, stopping it now! count=" + log.warn("TerminatorThread should have been finished by now, stopping it now! count=" + count); break; } @@ -47,8 +46,7 @@ public void run() { log.info("InterruptedException on Terminator thread:" + adaptor.toWatch.getPath(), e); } catch (Throwable e) { - log - .warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(), + log.warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(), e); } diff --git a/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java b/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java index b6b043a3a..5817046ad 100644 --- a/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java +++ b/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java @@ -36,7 +36,7 @@ public class AdaptorFactory { * @param className the name of the {@link Adaptor} class to instantiate * @return an Adaptor of the specified type */ - static Adaptor createAdaptor(String className) { + static public Adaptor createAdaptor(String className) { Object obj = null; try { // the following reflection business for type checking is probably diff --git a/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java b/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java index d6c7ca252..2e6c25c92 100644 --- a/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java +++ b/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.chukwa.datacollection.agent; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; @@ -27,6 +28,7 @@ */ public interface AdaptorManager { + Configuration getConfiguration(); int adaptorCount(); long stopAdaptor(long number, boolean gracefully); @@ -34,4 +36,37 @@ public interface AdaptorManager { long processAddCommand(String cmd); Map getAdaptorList(); + static AdaptorManager NULL = new AdaptorManager() { + + @Override + public int adaptorCount() { + return 0; + } + + @Override + public Adaptor getAdaptor(long id) { + return null; + } + + @Override + public Map getAdaptorList() { + return Collections.emptyMap(); + } + + @Override + public Configuration getConfiguration() { + return new Configuration(); + } + + @Override + public long processAddCommand(String cmd) { + return 0; + } + + @Override + public long stopAdaptor(long number, boolean gracefully) { + return 0; + } + }; + } diff --git a/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java index 718cc056b..5928bdbab 100644 --- a/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java +++ b/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java @@ -247,6 +247,7 @@ public boolean isBound() { public void tryToBind() throws IOException { s = new ServerSocket(portno); + s.setReuseAddress(true); portno = s.getLocalPort(); if (s.isBound()) log.info("socket bound to " + s.getLocalPort()); diff --git a/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java b/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java index b8a0d2c67..e3cf6f37e 100644 --- a/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java +++ b/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java @@ -54,7 +54,8 @@ */ public class ChukwaAgent implements AdaptorManager { // boolean WRITE_CHECKPOINTS = true; - static final AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent"); + static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent"); + static Logger log = Logger.getLogger(ChukwaAgent.class); static ChukwaAgent agent = null; diff --git a/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java b/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java index 1816f6f20..6e7064dd8 100644 --- a/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java +++ b/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java @@ -4,6 +4,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.hadoop.chukwa.Chunk; + public class RecordUtil { static Pattern clusterPattern = Pattern .compile("(.*)?cluster=\"(.*?)\"(.*)?"); @@ -17,6 +19,17 @@ public static String getClusterName(Record record) { } } + return "undefined"; + } + public static String getClusterName(Chunk chunk) { + String tags = chunk.getTags(); + if (tags != null) { + Matcher matcher = clusterPattern.matcher(tags); + if (matcher.matches()) { + return matcher.group(2); + } + } + return "undefined"; } } diff --git a/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java b/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java new file mode 100644 index 000000000..e0883743a --- /dev/null +++ b/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java @@ -0,0 +1,90 @@ +package org.apache.hadoop.chukwa.tools.backfilling; + +import java.io.File; + +import org.apache.hadoop.chukwa.ChunkImpl; +import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; +import org.apache.hadoop.chukwa.datacollection.ChunkQueue; +import org.apache.hadoop.chukwa.datacollection.DataFactory; +import org.apache.hadoop.chukwa.datacollection.adaptor.*; +import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory; +import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager; +import org.apache.hadoop.chukwa.datacollection.connector.Connector; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + +public class BackfillingLoader { + static Logger log = Logger.getLogger(BackfillingLoader.class); + + protected Configuration conf = null; + protected ChunkQueue queue = null; + protected Connector connector = null; + + private String cluster = null; + private String machine = null; + private String adaptorName = null; + private String recordType = null; + private String logFile = null; + + public BackfillingLoader(Configuration conf, String cluster, String machine, + String adaptorName, String recordType, String logFile) { + + this.conf = conf; + this.cluster = cluster.trim(); + this.machine = machine.trim(); + this.adaptorName = adaptorName; + this.recordType = recordType; + this.logFile = logFile; + + log.info("cluster >>>" + cluster) ; + log.info("machine >>>" + machine) ; + log.info("adaptorName >>>" + adaptorName) ; + log.info("recordType >>>" + recordType) ; + log.info("logFile >>>" + logFile) ; + + // Set the right cluster and machine information + DataFactory.getInstance().addDefaultTag("cluster=\"" + this.cluster + "\""); + ChunkImpl.setHostAddress(this.machine); + + queue = DataFactory.getInstance().getEventQueue(); + connector = new QueueToWriterConnector(conf,true); + } + + public void process() throws AdaptorException { + File file = new File(logFile); + connector.start(); + Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName); + adaptor.start(System.currentTimeMillis(), recordType, "0 " +file.getAbsolutePath() , + 0l,queue, AdaptorManager.NULL ); + adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED); + connector.shutdown(); + file.renameTo(new File(logFile + ".sav")); + } + + public static void usage() { + System.out.println("java org.apache.hadoop.chukwa.tools.backfilling.BackfillingLoader "); + System.exit(-1); + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + + if (args.length != 5) { + usage(); + } + + + String cluster = args[0]; + String machine = args[1]; + String adaptorName = args[2]; + String recordType = args[3]; + String logFile = args[4]; + + BackfillingLoader loader = new BackfillingLoader(new ChukwaConfiguration(),cluster,machine,adaptorName,recordType,logFile); + loader.process(); + } + +} diff --git a/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java b/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java new file mode 100644 index 000000000..df230d5f2 --- /dev/null +++ b/src/java/org/apache/hadoop/chukwa/tools/backfilling/QueueToWriterConnector.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.tools.backfilling; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.ChunkQueue; +import org.apache.hadoop.chukwa.datacollection.DataFactory; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.chukwa.datacollection.connector.Connector; +import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter; +import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter; +import org.apache.hadoop.chukwa.util.DaemonWatcher; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + +public class QueueToWriterConnector implements Connector, Runnable { + static Logger log = Logger.getLogger(QueueToWriterConnector.class); + static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024; + + protected Configuration conf = null; + protected volatile boolean isRunning = true; + protected ChunkQueue chunkQueue = DataFactory.getInstance().getEventQueue(); + protected ChukwaWriter writer = null; + protected Thread runner = null; + protected boolean isBackfilling = false; + public QueueToWriterConnector(Configuration conf,boolean isBackfilling) { + this.conf = conf; + this.isBackfilling = isBackfilling; + } + + @Override + public void reloadConfiguration() { + // do nothing here + } + + @Override + public void shutdown() { + isRunning = false; + + log.info("Shutdown in progress ..."); + while (isAlive()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + + try { + if (writer != null) { + writer.close(); + } + } catch(Exception e) { + log.warn("Exception while closing writer: ", e); + } + log.info("Shutdown done."); + } + + @Override + public void start() { + log.info("Starting QueueToWriterConnector thread"); + runner = new Thread(this, "QueueToWriterConnectorThread"); + runner.start(); + } + + protected boolean isAlive() { + return this.runner.isAlive(); + } + + @Override + public void run() { + + log.info("initializing QueueToWriterConnector"); + try { + String writerClassName = conf.get("chukwaCollector.writerClass", + SeqFileWriter.class.getCanonicalName()); + Class writerClass = Class.forName(writerClassName); + if (writerClass != null + && ChukwaWriter.class.isAssignableFrom(writerClass)) { + writer = (ChukwaWriter) writerClass.newInstance(); + } else { + throw new RuntimeException("Wrong class type"); + } + writer.init(conf); + + } catch (Throwable e) { + log.warn("failed to use user-chosen writer class, Bail out!", e); + DaemonWatcher.bailout(-1); + } + + + List chunks = new LinkedList(); + ChukwaAgent agent = null;// ChukwaAgent.getAgent(); + + log.info("processing data for QueueToWriterConnector"); + + while ( isRunning || chunkQueue.size() != 0 || chunks.size() != 0) { + try { + if (chunks.size() == 0) { + + if (isBackfilling && chunkQueue.size() == 0) { + Thread.sleep(300); + continue; + } + chunkQueue.collect(chunks, MAX_SIZE_PER_POST); + log.info("Got " + chunks.size() + " chunks back from the queue"); + } + + writer.add(chunks); + + if (agent != null) { + for(Chunk chunk: chunks) { + agent.reportCommit(chunk.getInitiator(), chunk.getSeqID()); + } + } + + chunks.clear(); + + } + catch (Throwable e) { + log.warn("Could not save some chunks"); + e.printStackTrace(); + try { + Thread.sleep(5000); + } catch (InterruptedException e1) {} + } + } + } + +} diff --git a/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java b/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java index 43583cb7c..0d48ddd53 100644 --- a/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java +++ b/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java @@ -22,8 +22,8 @@ import java.util.Random; import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.chukwa.datacollection.*; -import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; -import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; +import org.apache.hadoop.chukwa.datacollection.adaptor.*; +import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager; import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager; public class ConstRateAdaptor extends Thread implements Adaptor { @@ -89,13 +89,14 @@ public String toString() { return "const rate " + type; } + @Deprecated public void hardStop() throws AdaptorException { - stopping = true; + shutdown(AdaptorShutdownPolicy.HARD_STOP); } + @Deprecated public long shutdown() throws AdaptorException { - stopping = true; - return offset; + return shutdown(AdaptorShutdownPolicy.GRACEFULLY); } @Override @@ -103,4 +104,17 @@ public String getType() { return type; } + + @Override + public long shutdown(AdaptorShutdownPolicy shutdownPolicy) { + + switch(shutdownPolicy) { + case HARD_STOP : + case GRACEFULLY : + case WAIT_TILL_FINISHED : + stopping = true; + break; + } + return offset; + } } diff --git a/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java b/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java index 6dacfae47..891f349a5 100644 --- a/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java +++ b/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java @@ -23,8 +23,7 @@ import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.chukwa.datacollection.*; import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager; -import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; -import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; +import org.apache.hadoop.chukwa.datacollection.adaptor.*; public class MaxRateSender extends Thread implements Adaptor { @@ -77,14 +76,26 @@ public String toString() { } public long shutdown() throws AdaptorException { - stopping = true; - return offset; + return shutdown(AdaptorShutdownPolicy.GRACEFULLY); } public void hardStop() throws AdaptorException { - stopping = true; + shutdown(AdaptorShutdownPolicy.HARD_STOP); } + @Override + public long shutdown(AdaptorShutdownPolicy shutdownPolicy) { + + switch(shutdownPolicy) { + case HARD_STOP : + case GRACEFULLY : + case WAIT_TILL_FINISHED : + stopping = true; + break; + } + return offset; + } + @Override public String getType() { return type; diff --git a/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java b/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java index b6f73746a..02dcecd85 100644 --- a/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java +++ b/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java @@ -75,4 +75,11 @@ public long getStartOffset() { } + @Override + public long shutdown(AdaptorShutdownPolicy shutdownPolicy) + throws AdaptorException { + // TODO Auto-generated method stub + return 0; + } + } diff --git a/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java b/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java new file mode 100644 index 000000000..fad642cd4 --- /dev/null +++ b/src/test/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java @@ -0,0 +1,334 @@ +package org.apache.hadoop.chukwa.tools.backfilling; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.chukwa.ChukwaArchiveKey; +import org.apache.hadoop.chukwa.ChunkImpl; +import org.apache.hadoop.chukwa.extraction.engine.RecordUtil; +import org.apache.hadoop.chukwa.validationframework.util.MD5; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; + + +public class TestBackfillingLoader extends TestCase{ + + public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() { + String tmpDir = System.getProperty("test.build.data", "/tmp"); + long ts = System.currentTimeMillis(); + String dataDir = tmpDir + "/TestBackfillingLoader_" + ts; + + Configuration conf = new Configuration(); + conf.set("writer.hdfs.filesystem", "file:///"); + conf.set("chukwaCollector.outputDir", dataDir + "/log/"); + conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1)); + + String cluster = "MyCluster_" + ts; + String machine = "machine_" + ts; + String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"; + String recordType = "MyRecordType_" + ts; + + try { + FileSystem fs = FileSystem.getLocal(conf); + + File in1Dir = new File(dataDir + "/input"); + in1Dir.mkdirs(); + int lineCount = 107; + File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount); + long size = inputFile.length(); + + String logFile = inputFile.getAbsolutePath(); + System.out.println("Output:" + logFile); + System.out.println("File:" + inputFile.length()); + BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile); + loader.process(); + + File finalOutputFile = new File(dataDir + "/input/in1.txt.sav"); + + Assert.assertTrue(inputFile.exists() == false); + Assert.assertTrue(finalOutputFile.exists() == true); + + String doneFile = null; + File directory = new File(dataDir + "/log/"); + String[] files = directory.list(); + for(String file: files) { + if ( file.endsWith(".done") ){ + doneFile = dataDir + "/log/" + file; + break; + } + } + + long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile, + cluster, recordType, machine, logFile); + Assert.assertTrue(seqId == size); + + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(); + } + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void testBackfillingLoaderWithFileAdaptor() { + String tmpDir = System.getProperty("test.build.data", "/tmp"); + long ts = System.currentTimeMillis(); + String dataDir = tmpDir + "/TestBackfillingLoader_" + ts; + + Configuration conf = new Configuration(); + conf.set("writer.hdfs.filesystem", "file:///"); + conf.set("chukwaCollector.outputDir", dataDir + "/log/"); + conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1)); + + String cluster = "MyCluster_" + ts; + String machine = "machine_" + ts; + String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor"; + String recordType = "MyRecordType_" + ts; + + try { + FileSystem fs = FileSystem.getLocal(conf); + + File in1Dir = new File(dataDir + "/input"); + in1Dir.mkdirs(); + int lineCount = 118; + File inputFile = makeTestFile(dataDir + "/input/in2.txt",lineCount); + long size = inputFile.length(); + + String logFile = inputFile.getAbsolutePath(); + System.out.println("Output:" + logFile); + System.out.println("File:" + inputFile.length()); + BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile); + loader.process(); + + File finalOutputFile = new File(dataDir + "/input/in2.txt.sav"); + + Assert.assertTrue(inputFile.exists() == false); + Assert.assertTrue(finalOutputFile.exists() == true); + + String doneFile = null; + File directory = new File(dataDir + "/log/"); + String[] files = directory.list(); + for(String file: files) { + if ( file.endsWith(".done") ){ + doneFile = dataDir + "/log/" + file; + break; + } + } + + long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile, + cluster, recordType, machine, logFile); + Assert.assertTrue(seqId == size); + + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(); + } + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + + public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFile() { + String tmpDir = System.getProperty("test.build.data", "/tmp"); + long ts = System.currentTimeMillis(); + String dataDir = tmpDir + "/TestBackfillingLoader_" + ts; + + Configuration conf = new Configuration(); + conf.set("writer.hdfs.filesystem", "file:///"); + conf.set("chukwaCollector.outputDir", dataDir + "/log/"); + conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1)); + + + String cluster = "MyCluster_" + ts; + String machine = "machine_" + ts; + String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"; + String recordType = "MyRecordType_" + ts; + + try { + FileSystem fs = FileSystem.getLocal(conf); + + File in1Dir = new File(dataDir + "/input"); + in1Dir.mkdirs(); + int lineCount = 1024*1024;//34MB + File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount); + long size = inputFile.length(); + + String logFile = inputFile.getAbsolutePath(); + System.out.println("Output:" + logFile); + System.out.println("File:" + inputFile.length()); + BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile); + loader.process(); + + File finalOutputFile = new File(dataDir + "/input/in1.txt.sav"); + + Assert.assertTrue(inputFile.exists() == false); + Assert.assertTrue(finalOutputFile.exists() == true); + + String doneFile = null; + File directory = new File(dataDir + "/log/"); + String[] files = directory.list(); + for(String file: files) { + if ( file.endsWith(".done") ){ + doneFile = dataDir + "/log/" + file; + break; + } + } + + long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile, + cluster, recordType, machine, logFile); + + Assert.assertTrue(seqId == size); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(); + } + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFileLocalWriter() { + String tmpDir = System.getProperty("test.build.data", "/tmp"); + long ts = System.currentTimeMillis(); + String dataDir = tmpDir + "/TestBackfillingLoader_" + ts; + + Configuration conf = new Configuration(); + conf.set("writer.hdfs.filesystem", "file:///"); + conf.set("chukwaCollector.outputDir", dataDir + "/log/"); + conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1)); + conf.set("chukwaCollector.localOutputDir", dataDir + "/log/"); + conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter"); + + String cluster = "MyCluster_" + ts; + String machine = "machine_" + ts; + String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"; + String recordType = "MyRecordType_" + ts; + + try { + FileSystem fs = FileSystem.getLocal(conf); + + File in1Dir = new File(dataDir + "/input"); + in1Dir.mkdirs(); + int lineCount = 1024*1024*2;//64MB + File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount); + long size = inputFile.length(); + + String logFile = inputFile.getAbsolutePath(); + System.out.println("Output:" + logFile); + System.out.println("File:" + inputFile.length()); + BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile); + loader.process(); + + File finalOutputFile = new File(dataDir + "/input/in1.txt.sav"); + + Assert.assertTrue(inputFile.exists() == false); + Assert.assertTrue(finalOutputFile.exists() == true); + + String doneFile = null; + File directory = new File(dataDir + "/log/"); + String[] files = directory.list(); + for(String file: files) { + if ( file.endsWith(".done") ){ + doneFile = dataDir + "/log/" + file; + break; + } + } + + long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile, + cluster, recordType, machine, logFile); + + Assert.assertTrue(seqId == size); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(); + } + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (IOException e) { + e.printStackTrace(); + } + } + protected long validateDataSink(FileSystem fs,Configuration conf, String dataSinkFile, File logFile, + String cluster,String dataType, String source, String application) throws Throwable { + SequenceFile.Reader reader = null; + long lastSeqId = -1; + BufferedWriter out = null; + try { + + reader = new SequenceFile.Reader(fs, new Path(dataSinkFile), conf); + ChukwaArchiveKey key = new ChukwaArchiveKey(); + ChunkImpl chunk = ChunkImpl.getBlankChunk(); + + String dataSinkDumpName = dataSinkFile + ".dump"; + out = new BufferedWriter(new FileWriter(dataSinkDumpName)); + + + + while (reader.next(key, chunk)) { + Assert.assertTrue(cluster.equals(RecordUtil.getClusterName(chunk))); + Assert.assertTrue(dataType.equals(chunk.getDataType())); + Assert.assertTrue(source.equals(chunk.getSource())); + + out.write(new String(chunk.getData())); + lastSeqId = chunk.getSeqID() ; + } + + out.close(); + out = null; + reader.close(); + reader = null; + + String dataSinkMD5 = MD5.checksum(new File(dataSinkDumpName)); + String logFileMD5 = MD5.checksum(logFile); + Assert.assertTrue(dataSinkMD5.equals(logFileMD5)); + } + finally { + if (out != null) { + out.close(); + } + + if (reader != null) { + reader.close(); + } + } + + + return lastSeqId; + } + + private File makeTestFile(String name, int size) throws IOException { + File tmpOutput = new File(name); + + FileOutputStream fos = new FileOutputStream(tmpOutput); + + PrintWriter pw = new PrintWriter(fos); + for (int i = 0; i < size; ++i) { + pw.print(i + " "); + pw.println("abcdefghijklmnopqrstuvwxyz"); + } + pw.flush(); + pw.close(); + return tmpOutput; + } + +}