Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
CHUKWA-714 Make ChunkQueue configurable
git-svn-id: https://svn.apache.org/repos/asf/chukwa/trunk@1607199 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Shreyas committed Jul 1, 2014
1 parent 5001f85 commit e8ab727dbfea21070694e374f7ca9fe183db0d50
Showing 6 changed files with 401 additions and 5 deletions.
@@ -81,6 +81,17 @@
<value>org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter</value>
</property>

<property>
<name>chukwaAgent.chunk.queue</name>
<value>org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue</value>
</property>

<property>
<name>chukwaAgent.chunk.queue.limit</name>
<value>10485760</value>
</property>


<property>
<name>syslog.adaptor.port.9095.facility.LOCAL1</name>
<value>HADOOP</value>
@@ -21,17 +21,20 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Iterator;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
import org.apache.log4j.Logger;

public class DataFactory {
static Logger log = Logger.getLogger(DataFactory.class);
static final int QUEUE_SIZE_KB = 10 * 1024;
static final String COLLECTORS_FILENAME = "collectors";
static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";

private static DataFactory dataFactory = null;
private ChunkQueue chunkQueue = null;

@@ -50,10 +53,45 @@ public static DataFactory getInstance() {

public synchronized ChunkQueue getEventQueue() {
if (chunkQueue == null) {
chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
chunkQueue = createEventQueue();
}
return chunkQueue;
}

public synchronized ChunkQueue createEventQueue() {
Configuration conf = ChukwaAgent.getStaticConfiguration();
if(conf == null){
//Must be a unit test, use default queue with default configuration
return new MemLimitQueue(null);
}
String receiver = conf.get(CHUNK_QUEUE);
ChunkQueue queue = null;
if(receiver == null){
log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
queue = new MemLimitQueue(conf);
return queue;
}

try {
Class<?> clazz = Class.forName(receiver);
log.info(clazz);
if(!ChunkQueue.class.isAssignableFrom(clazz)){
throw new Exception(receiver + " is not an instance of ChunkQueue");
}
try {
Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
queue = (ChunkQueue) ctor.newInstance(conf);
} catch(NoSuchMethodException nsme){
//Queue implementations which take no configuration parameter
queue = (ChunkQueue) clazz.newInstance();
}
} catch(Exception e) {
log.error("Could not instantiate configured ChunkQueue due to: " + e);
log.error("Defaulting to MemLimitQueue");
queue = new MemLimitQueue(conf);
}
return queue;
}

public String getDefaultTags() {
return defaultTags;
@@ -716,6 +716,10 @@ public Configuration getConfiguration() {
return conf;
}

public static Configuration getStaticConfiguration() {
return conf;
}

@Override
public Adaptor getAdaptor(String name) {
synchronized(adaptorsByName) {
@@ -25,6 +25,7 @@
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
@@ -39,10 +40,12 @@ public class MemLimitQueue implements ChunkQueue {
static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent", "chunkQueue");;
private Queue<Chunk> queue = new LinkedList<Chunk>();
private long dataSize = 0;
private final long MAX_MEM_USAGE;
private long MAX_MEM_USAGE;
static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
static final int QUEUE_SIZE = 10 * 1024 * 1024;

public MemLimitQueue(int limit) {
MAX_MEM_USAGE = limit;
public MemLimitQueue(Configuration conf) {
configure(conf);
}

/**
@@ -111,4 +114,21 @@ public void collect(List<Chunk> events, int maxSize)
public int size() {
return queue.size();
}

private void configure(Configuration conf) {
MAX_MEM_USAGE = QUEUE_SIZE;
if(conf == null){
return;
}
String limit = conf.get(CHUNK_QUEUE_LIMIT);
if(limit != null){
try{
MAX_MEM_USAGE = Integer.parseInt(limit);
} catch(NumberFormatException nfe) {
log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
+ ". Defaulting internal queue size to " + QUEUE_SIZE);
}
}
log.info("Using MemLimitQueue limit of " + MAX_MEM_USAGE);
}
}
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.chukwa.datacollection.agent;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
* An event queue that discards incoming chunks once a fixed upper limit of data
* is enqueued. The method calling add will not block.
*
* For now, uses the size of the data field. Should really use
* estimatedSerializedSize()?
*
*/
public class NonBlockingMemLimitQueue implements ChunkQueue {
static Logger log = Logger.getLogger(NonBlockingMemLimitQueue.class);
static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent",
"chunkQueue");
static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
static final int QUEUE_SIZE = 10 * 1024 * 1024;
private Queue<Chunk> queue = new LinkedList<Chunk>();
private long dataSize = 0;
private long MAX_MEM_USAGE;

public NonBlockingMemLimitQueue(Configuration conf) {
configure(conf);
}

/**
* @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
*/
public void add(Chunk chunk) throws InterruptedException {
assert chunk != null : "can't enqueue null chunks";
int chunkSize = chunk.getData().length;
synchronized (this) {
if (chunkSize + dataSize > MAX_MEM_USAGE) {
if (dataSize == 0) { // queue is empty, but data is still too big
log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType()
+ " and source =" + chunk.getStreamName());
return; // return without sending; otherwise we'd deadlock.
// this error should probably be fatal; there's no way to
// recover.
} else {
metrics.fullQueue.set(1);
log.warn("Discarding chunk due to NonBlockingMemLimitQueue full [" + dataSize
+ "]");
return;
}
}
metrics.fullQueue.set(0);
dataSize += chunk.getData().length;
queue.add(chunk);
metrics.addedChunk.inc();
metrics.queueSize.set(queue.size());
metrics.dataSize.set(dataSize);
this.notifyAll();
}
}

/**
* @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List,
* int)
*/
public void collect(List<Chunk> events, int maxSize)
throws InterruptedException {
synchronized (this) {
// we can't just say queue.take() here, since we're holding a lock.
while (queue.isEmpty()) {
this.wait();
}

int size = 0;
while (!queue.isEmpty() && (size < maxSize)) {
Chunk e = this.queue.remove();
metrics.removedChunk.inc();
int chunkSize = e.getData().length;
size += chunkSize;
dataSize -= chunkSize;
metrics.dataSize.set(dataSize);
events.add(e);
}
metrics.queueSize.set(queue.size());
this.notifyAll();
}

if (log.isDebugEnabled()) {
log.debug("WaitingQueue.inQueueCount:" + queue.size()
+ "\tWaitingQueue.collectCount:" + events.size());
}
}

public int size() {
return queue.size();
}

private void configure(Configuration conf) {
MAX_MEM_USAGE = QUEUE_SIZE;
if(conf == null){
return;
}
String limit = conf.get(CHUNK_QUEUE_LIMIT);
if(limit != null){
try{
MAX_MEM_USAGE = Integer.parseInt(limit);
} catch(NumberFormatException nfe) {
log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
+ ". Defaulting internal queue size to " + QUEUE_SIZE);
}
}
log.info("Using NonBlockingMemLimitQueue limit of " + MAX_MEM_USAGE);
}
}

0 comments on commit e8ab727

Please sign in to comment.