Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
CHUKWA-565. Added support HBaseWriter support for TsProcessor. (Bill …
…Graham via Eric Yang)

git-svn-id: https://svn.apache.org/repos/asf/incubator/chukwa/trunk@1424228 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
macroadster committed Dec 19, 2012
1 parent 87e86df commit fdc92fe56b487ffe4c9d8faf9880f108a0944baf
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 34 deletions.
@@ -30,6 +30,8 @@ Trunk (unreleased changes)

BUGS

CHUKWA-565. Added support HBaseWriter support for TsProcessor. (Bill Graham via Eric Yang)

CHUKWA-677. Added pid file check before starting processes. (Sreepathi Prasanna via Eric Yang)

CHUKWA-676. Exclude temporarily data from release audit tools. (Eric Yang)
@@ -29,9 +29,10 @@
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
import org.apache.hadoop.chukwa.extraction.demux.Demux;
import org.apache.hadoop.chukwa.util.ClassUtils;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -52,10 +53,8 @@ public class HBaseWriter extends PipelineableWriter {
final Timer statTimer;
private OutputCollector output;
private Reporter reporter;
private ChukwaConfiguration conf = new ChukwaConfiguration();
String defaultProcessor = conf.get(
"chukwa.demux.mapper.default.processor",
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
private ChukwaConfiguration conf;
String defaultProcessor;
private HTablePool pool;
private Configuration hconf;

@@ -83,27 +82,36 @@ public HBaseWriter() {
}

public HBaseWriter(boolean reportStats) {
this.reportStats = reportStats;
statTimer = new Timer();
/* HBase Version 0.20.x */
//hconf = new HBaseConfiguration();

/* HBase Version 0.89.x */
hconf = HBaseConfiguration.create();
/* HBase Version >= 0.89.x */
this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
}

public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
this(true);
this(true, conf, hconf);
}

private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) {
this.reportStats = reportStats;
this.conf = conf;
this.hconf = hconf;
this.statTimer = new Timer();
this.defaultProcessor = conf.get(
"chukwa.demux.mapper.default.processor",
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
Demux.jobConf = conf;
log.info("hbase.zookeeper.quorum: " + hconf.get("hbase.zookeeper.quorum"));
}

public void close() {
statTimer.cancel();
if (reportStats) {
statTimer.cancel();
}
}

public void init(Configuration conf) throws WriterException {
statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
if (reportStats) {
statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
}
output = new OutputCollector();
reporter = new Reporter();
if(conf.getBoolean("hbase.writer.verify.schema", false)) {
@@ -175,23 +183,15 @@ public CommitStatus add(List<Chunk> chunks) throws WriterException {
CommitStatus rv = ChukwaWriter.COMMIT_OK;
try {
for(Chunk chunk : chunks) {
String processorClass = conf.get(chunk.getDataType(),
defaultProcessor);
synchronized (this) {
MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
try {
Table table = null;
if(processor.getClass().isAnnotationPresent(Table.class)) {
table = processor.getClass().getAnnotation(Table.class);
} else if(processor.getClass().isAnnotationPresent(Tables.class)) {
Tables tables = processor.getClass().getAnnotation(Tables.class);
for(Table t : tables.annotations()) {
table = t;
}
}
Table table = findHBaseTable(chunk.getDataType());

if(table!=null) {
HTableInterface hbase = pool.getTable(table.name().getBytes());
HTableInterface hbase = pool.getTable(table.name().getBytes());
MapProcessor processor = getProcessor(chunk.getDataType());
processor.process(new ChukwaArchiveKey(), chunk, output, reporter);

hbase.put(output.getKeyValues());
pool.putTable(hbase);
}
@@ -214,4 +214,30 @@ public CommitStatus add(List<Chunk> chunks) throws WriterException {
return rv;
}

public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
MapProcessor processor = getProcessor(dataType);

Table table = null;
if(processor.getClass().isAnnotationPresent(Table.class)) {
return processor.getClass().getAnnotation(Table.class);
} else if(processor.getClass().isAnnotationPresent(Tables.class)) {
Tables tables = processor.getClass().getAnnotation(Tables.class);
for(Table t : tables.annotations()) {
table = t;
}
}

return table;
}

public String findHBaseColumnFamilyName(String dataType)
throws UnknownRecordTypeException {
Table table = findHBaseTable(dataType);
return table.columnFamily();
}

private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
String processorClass = conf.get(dataType, defaultProcessor);
return MapProcessorFactory.getProcessor(processorClass);
}
}
@@ -61,7 +61,7 @@
public class Demux extends Configured implements Tool {
static Logger log = Logger.getLogger(Demux.class);
static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
public static JobConf jobConf = null;
public static Configuration jobConf = null;

public static class MapClass extends MapReduceBase implements
Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
@@ -34,7 +34,7 @@
import org.apache.hadoop.chukwa.util.RegexUtil;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
@@ -81,10 +81,10 @@ public TsProcessor() {
protected void parse(String recordEntry,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
throws Throwable {
String dStr = null;
try {
SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
String dStr = null;

// fetch the part of the record that contains the date.
if(datePattern != null) {
@@ -108,7 +108,7 @@ protected void parse(String recordEntry,
output.collect(key, record);
} catch (ParseException e) {
log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
+ "]", e);
+ "], date string='" + dStr + "'", e);
e.printStackTrace();
throw e;
} catch (IOException e) {
@@ -130,7 +130,7 @@ private SimpleDateFormat fetchDateFormat(String dataType) {
return dateFormatMap.get(dataType);
}

JobConf jobConf = Demux.jobConf;
Configuration jobConf = Demux.jobConf;
String dateFormat = DEFAULT_DATE_FORMAT;

if (jobConf != null) {
@@ -139,6 +139,7 @@ private SimpleDateFormat fetchDateFormat(String dataType) {
dateFormat);
}

log.info("dataType: " + chunk.getDataType() + ", dateFormat="+ dateFormat);
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
dateFormatMap.put(dataType, sdf);

@@ -156,7 +157,7 @@ private Pattern fetchDateLocationPattern(String dataType) {
return datePatternMap.get(dataType);
}

JobConf jobConf = Demux.jobConf;
Configuration jobConf = Demux.jobConf;
String datePattern = null;
Pattern pattern = null;

0 comments on commit fdc92fe

Please sign in to comment.