From 16ef5dfe7b970d0c6ee22457ddb9c1e6cd57c472 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Tue, 6 Sep 2016 17:11:57 +0800 Subject: [PATCH 01/16] =?UTF-8?q?UPDSource=E6=8E=A5=E6=94=B6=E7=9A=84?= =?UTF-8?q?=E5=8D=95=E6=9D=A1=E6=B6=88=E6=81=AF=E6=9C=80=E5=A4=A7=E5=AD=97?= =?UTF-8?q?=E8=8A=82=E6=95=B0=E8=B0=83=E6=95=B4=E4=B8=BA4096?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/apache/flume/source/SyslogUDPSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 175bebbcc5..7f3c7f9778 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -63,7 +63,7 @@ public class SyslogUDPSource extends AbstractSource private CounterGroup counterGroup = new CounterGroup(); // Default Min size - public static final int DEFAULT_MIN_SIZE = 2048; + public static final int DEFAULT_MIN_SIZE = 4096; public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; public class syslogHandler extends SimpleChannelHandler { From 4525497568f6a225572899ab28648f1c57c83461 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Tue, 6 Sep 2016 17:17:27 +0800 Subject: [PATCH 02/16] =?UTF-8?q?SysUDPSource=20=E6=8E=A5=E6=94=B6?= =?UTF-8?q?=E9=9D=9E=E6=B3=95=E6=A0=BC=E5=BC=8F=E7=9A=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=E4=B8=8D=E6=89=93=E5=8D=B0warning=E5=92=8Cheader?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/apache/flume/source/SyslogUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index 43a10e1b4d..20ac17c8c8 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -314,8 +314,8 @@ Event buildEvent() { headers.put("host", hostName); } if (isBadEvent) { - logger.warn("Event created from Invalid Syslog data."); - headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); + //logger.warn("Event created from Invalid Syslog data."); + //headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); } else if (isIncompleteEvent) { logger.warn("Event size larger than specified event size: {}. You should " + "consider increasing your event size.", maxSize); From 5d6d89f22bcc2f7cbfa300165473683cdc286ee6 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 19:08:40 +0800 Subject: [PATCH 03/16] =?UTF-8?q?=E6=B7=BB=E5=8A=A0UDP=20Source?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/flume/source/UDPSource.java | 173 ++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java new file mode 100644 index 0000000000..15a5af497f --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.CounterGroup; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UDPSource extends AbstractSource + implements EventDrivenSource, Configurable { + + private int port; + private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426 + private String host = null; + private Channel nettyChannel; + private Map formaterProp; + private Set keepFields; + + private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class); + + private CounterGroup counterGroup = new CounterGroup(); + + // Default Min size + public static final int DEFAULT_MIN_SIZE = 4096; + public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; + + public class syslogHandler extends SimpleChannelHandler { + private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true); + + public void setFormater(Map prop) { + syslogUtils.addFormats(prop); + } + + public void setKeepFields(Set keepFields) { + syslogUtils.setKeepFields(keepFields); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { + try { + byte[] dst = new byte[DEFAULT_MIN_SIZE];; + ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); + buffer.getBytes(0, dst); + Event e = EventBuilder.withBody(dst); + if (e == null) { + return; + } + getChannelProcessor().processEvent(e); + counterGroup.incrementAndGet("events.success"); + } catch (ChannelException ex) { + counterGroup.incrementAndGet("events.dropped"); + logger.error("Error writting to channel", ex); + return; + } catch (RuntimeException ex) { + counterGroup.incrementAndGet("events.dropped"); + logger.error("Error parsing event from syslog stream, event dropped", ex); + return; + } + } + } + + @Override + public void start() { + // setup Netty server + ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap( + new OioDatagramChannelFactory(Executors.newCachedThreadPool())); + final syslogHandler handler = new syslogHandler(); + handler.setFormater(formaterProp); + handler.setKeepFields(keepFields); + serverBootstrap.setOption("receiveBufferSizePredictorFactory", + new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, + DEFAULT_INITIAL_SIZE, maxsize)); + serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + return Channels.pipeline(handler); + } + }); + + if (host == null) { + nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); + } else { + nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); + } + + super.start(); + } + + @Override + public void stop() { + logger.info("Syslog UDP Source stopping..."); + logger.info("Metrics:{}", counterGroup); + if (nettyChannel != null) { + nettyChannel.close(); + try { + nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("netty server stop interrupted", e); + } finally { + nettyChannel = null; + } + } + + super.stop(); + } + + @Override + public void configure(Context context) { + Configurables.ensureRequiredNonNull( + context, SyslogSourceConfigurationConstants.CONFIG_PORT); + port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT); + host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); + formaterProp = context.getSubProperties( + SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); + keepFields = SyslogUtils.chooseFieldsToKeep( + context.getString( + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, + SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)); + } + + @VisibleForTesting + public int getSourcePort() { + SocketAddress localAddress = nettyChannel.getLocalAddress(); + if (localAddress instanceof InetSocketAddress) { + InetSocketAddress addr = (InetSocketAddress) localAddress; + return addr.getPort(); + } + return 0; + } +} From 5add1c1135815b763928b6f140ce84c4b56f3b08 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 19:16:18 +0800 Subject: [PATCH 04/16] =?UTF-8?q?=E8=BF=98=E5=8E=9F=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/apache/flume/source/SyslogUDPSource.java | 2 +- .../src/main/java/org/apache/flume/source/SyslogUtils.java | 4 ++-- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 7f3c7f9778..175bebbcc5 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -63,7 +63,7 @@ public class SyslogUDPSource extends AbstractSource private CounterGroup counterGroup = new CounterGroup(); // Default Min size - public static final int DEFAULT_MIN_SIZE = 4096; + public static final int DEFAULT_MIN_SIZE = 2048; public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; public class syslogHandler extends SimpleChannelHandler { diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index 20ac17c8c8..43a10e1b4d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -314,8 +314,8 @@ Event buildEvent() { headers.put("host", hostName); } if (isBadEvent) { - //logger.warn("Event created from Invalid Syslog data."); - //headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); + logger.warn("Event created from Invalid Syslog data."); + headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); } else if (isIncompleteEvent) { logger.warn("Event size larger than specified event size: {}. You should " + "consider increasing your event size.", maxSize); diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index 15a5af497f..dbdf4e6fdf 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -81,7 +81,7 @@ public void setKeepFields(Set keepFields) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { - byte[] dst = new byte[DEFAULT_MIN_SIZE];; + byte[] dst = new byte[DEFAULT_MIN_SIZE]; ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); buffer.getBytes(0, dst); Event e = EventBuilder.withBody(dst); From 52aee14b16bb9fbd0ce05b494bd503f8c5b57a80 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 20:06:34 +0800 Subject: [PATCH 05/16] 1 --- .../java/org/apache/flume/source/UDPSource.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index dbdf4e6fdf..ec2bc39e48 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -67,21 +67,21 @@ public class UDPSource extends AbstractSource public static final int DEFAULT_MIN_SIZE = 4096; public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; - public class syslogHandler extends SimpleChannelHandler { - private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true); + public class udpHandler extends SimpleChannelHandler { + //private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true); public void setFormater(Map prop) { - syslogUtils.addFormats(prop); + //syslogUtils.addFormats(prop); } public void setKeepFields(Set keepFields) { - syslogUtils.setKeepFields(keepFields); + //syslogUtils.setKeepFields(keepFields); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { - byte[] dst = new byte[DEFAULT_MIN_SIZE]; + byte[] dst = new byte[DEFAULT_INITIAL_SIZE]; ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); buffer.getBytes(0, dst); Event e = EventBuilder.withBody(dst); @@ -107,7 +107,7 @@ public void start() { // setup Netty server ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap( new OioDatagramChannelFactory(Executors.newCachedThreadPool())); - final syslogHandler handler = new syslogHandler(); + final udpHandler handler = new udpHandler(); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); serverBootstrap.setOption("receiveBufferSizePredictorFactory", @@ -155,10 +155,12 @@ public void configure(Context context) { host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); formaterProp = context.getSubProperties( SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); + /* keepFields = SyslogUtils.chooseFieldsToKeep( context.getString( SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)); + */ } @VisibleForTesting From 83bb67afefb2dbd8a56cf164e3e5ca7dbe72d0a9 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 20:14:15 +0800 Subject: [PATCH 06/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index ec2bc39e48..0804e1c340 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -67,7 +67,7 @@ public class UDPSource extends AbstractSource public static final int DEFAULT_MIN_SIZE = 4096; public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; - public class udpHandler extends SimpleChannelHandler { + public class UdpHandler extends SimpleChannelHandler { //private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true); public void setFormater(Map prop) { @@ -107,7 +107,7 @@ public void start() { // setup Netty server ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap( new OioDatagramChannelFactory(Executors.newCachedThreadPool())); - final udpHandler handler = new udpHandler(); + final UdpHandler handler = new UdpHandler(); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); serverBootstrap.setOption("receiveBufferSizePredictorFactory", From bd84d470313f451123b30bf99b5c2a39f948f7c2 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 20:37:58 +0800 Subject: [PATCH 07/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index 0804e1c340..ec528ba49b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -83,6 +83,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { byte[] dst = new byte[DEFAULT_INITIAL_SIZE]; ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); + logger.warn("writting data: ", buffer.toString()); buffer.getBytes(0, dst); Event e = EventBuilder.withBody(dst); if (e == null) { From 550bf010c57fb9ef667c05fcb7de1aaa910db0cb Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 20:43:18 +0800 Subject: [PATCH 08/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index ec528ba49b..4c91483c5f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -81,7 +81,7 @@ public void setKeepFields(Set keepFields) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { - byte[] dst = new byte[DEFAULT_INITIAL_SIZE]; + byte[] dst = null; ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); logger.warn("writting data: ", buffer.toString()); buffer.getBytes(0, dst); From aa3fa0e52a6f0b4236b3ce2a2051623e07f54916 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 20:52:41 +0800 Subject: [PATCH 09/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index 4c91483c5f..fe150b738f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -81,9 +81,8 @@ public void setKeepFields(Set keepFields) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { - byte[] dst = null; - ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); - logger.warn("writting data: ", buffer.toString()); + ChannelBuffer buffer = mEvent.getMessage(); + byte[] dst = new byte[buffer.capacity()]; buffer.getBytes(0, dst); Event e = EventBuilder.withBody(dst); if (e == null) { From d9f66970c0a15547f319f066287b3af144d57890 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 20:56:36 +0800 Subject: [PATCH 10/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index fe150b738f..73a6b30c21 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -81,7 +81,7 @@ public void setKeepFields(Set keepFields) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { - ChannelBuffer buffer = mEvent.getMessage(); + ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); byte[] dst = new byte[buffer.capacity()]; buffer.getBytes(0, dst); Event e = EventBuilder.withBody(dst); From aecb265622dd1c5104f25f908685ea4baae59589 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 21:24:04 +0800 Subject: [PATCH 11/16] 1 --- .../org/apache/flume/source/UDPSource.java | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index 73a6b30c21..188edb27a4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; @@ -34,6 +35,7 @@ import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; @@ -58,6 +60,8 @@ public class UDPSource extends AbstractSource private Channel nettyChannel; private Map formaterProp; private Set keepFields; + private SourceCounter sourceCounter; + private ScheduledExecutorService connectionCountUpdater; private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class); @@ -80,7 +84,11 @@ public void setKeepFields(Set keepFields) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { + sourceCounter.incrementAppendReceivedCount(); + sourceCounter.incrementEventReceivedCount(); try { + + ChannelBuffer buffer = (ChannelBuffer)mEvent.getMessage(); byte[] dst = new byte[buffer.capacity()]; buffer.getBytes(0, dst); @@ -99,6 +107,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { logger.error("Error parsing event from syslog stream, event dropped", ex); return; } + + + sourceCounter.incrementAppendAcceptedCount(); + sourceCounter.incrementEventAcceptedCount(); } } @@ -125,13 +137,22 @@ public ChannelPipeline getPipeline() { } else { nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); } - + connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); + sourceCounter.start(); super.start(); + connectionCountUpdater.scheduleWithFixedDelay(new Runnable() { + + @Override + public void run() { + sourceCounter.setOpenConnectionCount( + Long.valueOf(srv.getNumActiveConnections())); + } + }, 0, 60, TimeUnit.SECONDS); } @Override public void stop() { - logger.info("Syslog UDP Source stopping..."); + logger.info("UDP Source stopping..."); logger.info("Metrics:{}", counterGroup); if (nettyChannel != null) { nettyChannel.close(); @@ -144,7 +165,21 @@ public void stop() { } } + sourceCounter.stop(); + connectionCountUpdater.shutdown(); + while (!connectionCountUpdater.isTerminated()) { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + logger.error("Interrupted while waiting for connection count executor " + + "to terminate", ex); + Throwables.propagate(ex); + } + } + super.stop(); + logger.info("UDP source {} stopped. Metrics: {}", getName(), + sourceCounter); } @Override @@ -161,6 +196,9 @@ public void configure(Context context) { SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)); */ + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } } @VisibleForTesting From b6f7a0b3cb676f3faac030c35105cc21cce7a66c Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 21:28:11 +0800 Subject: [PATCH 12/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index 188edb27a4..d4c126447a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -36,6 +36,7 @@ import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; +import org.apache.avro.ipc.NettyServer; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; @@ -140,6 +141,7 @@ public ChannelPipeline getPipeline() { connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); sourceCounter.start(); super.start(); + final NettyServer srv = (NettyServer)server; connectionCountUpdater.scheduleWithFixedDelay(new Runnable() { @Override From 44c2ea0f0f87c7c0fc0a15c837eb59296f2575ba Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Wed, 7 Sep 2016 21:30:11 +0800 Subject: [PATCH 13/16] 1 --- .../org/apache/flume/source/UDPSource.java | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index d4c126447a..f66190b5fa 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -36,7 +36,6 @@ import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; -import org.apache.avro.ipc.NettyServer; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; @@ -62,7 +61,6 @@ public class UDPSource extends AbstractSource private Map formaterProp; private Set keepFields; private SourceCounter sourceCounter; - private ScheduledExecutorService connectionCountUpdater; private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class); @@ -138,18 +136,8 @@ public ChannelPipeline getPipeline() { } else { nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); } - connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); sourceCounter.start(); super.start(); - final NettyServer srv = (NettyServer)server; - connectionCountUpdater.scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - sourceCounter.setOpenConnectionCount( - Long.valueOf(srv.getNumActiveConnections())); - } - }, 0, 60, TimeUnit.SECONDS); } @Override @@ -168,20 +156,8 @@ public void stop() { } sourceCounter.stop(); - connectionCountUpdater.shutdown(); - while (!connectionCountUpdater.isTerminated()) { - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - logger.error("Interrupted while waiting for connection count executor " - + "to terminate", ex); - Throwables.propagate(ex); - } - } super.stop(); - logger.info("UDP source {} stopped. Metrics: {}", getName(), - sourceCounter); } @Override From e52335e90fe162b190956b78d77f2271f75494ea Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Fri, 9 Sep 2016 15:44:45 +0800 Subject: [PATCH 14/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index f66190b5fa..52209886ac 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -55,7 +55,7 @@ public class UDPSource extends AbstractSource implements EventDrivenSource, Configurable { private int port; - private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426 + private int maxsize = 1 << 100; // 64k is max allowable in RFC 5426 private String host = null; private Channel nettyChannel; private Map formaterProp; From e12c64bb0b3177060f054b91c437b9ea5f258d1d Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Fri, 9 Sep 2016 16:01:34 +0800 Subject: [PATCH 15/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index 52209886ac..b74ebb1e99 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -55,7 +55,7 @@ public class UDPSource extends AbstractSource implements EventDrivenSource, Configurable { private int port; - private int maxsize = 1 << 100; // 64k is max allowable in RFC 5426 + private int maxsize = 1048576; // 64k is max allowable in RFC 5426 private String host = null; private Channel nettyChannel; private Map formaterProp; From 3a7d09d2ccf381b5f170cee0039ddc8b15d63101 Mon Sep 17 00:00:00 2001 From: chenwei5 Date: Fri, 9 Sep 2016 16:10:36 +0800 Subject: [PATCH 16/16] 1 --- .../src/main/java/org/apache/flume/source/UDPSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java index b74ebb1e99..33114772c6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/UDPSource.java @@ -68,7 +68,7 @@ public class UDPSource extends AbstractSource // Default Min size public static final int DEFAULT_MIN_SIZE = 4096; - public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; + public static final int DEFAULT_INITIAL_SIZE = maxsize; public class UdpHandler extends SimpleChannelHandler { //private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true);