Skip to content
Permalink
Browse files
FLUME-2613. Add support in FileChannelIntegrityTool to remove invalid…
… events from the channel.

(Ashish Paliwal via Hari)
  • Loading branch information
harishreedharan committed Apr 7, 2015
1 parent cfefda1 commit 91ec5794589bf3711cca2a251a511fa360e5ac30
Showing 5 changed files with 306 additions and 2 deletions.
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flume.channel.file;

import org.apache.flume.Event;

/**
*
*/
public class EventUtils {

/**
* Returns the Event encapsulated by a Put wrapper
*
* @param transactionEventRecord TransactionEvent
* @return Event if Put instance is present, null otherwise
*/
public static Event getEventFromTransactionEvent(TransactionEventRecord transactionEventRecord) {
if(transactionEventRecord instanceof Put) {
return ((Put)transactionEventRecord).getEvent();
}
return null;
}
}
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flume.channel.file;

import junit.framework.Assert;
import org.apache.flume.Event;
import org.junit.Test;

public class TestEventUtils {

@Test
public void testPutEvent() {
FlumeEvent event = new FlumeEvent(null, new byte[5]);
Put put = new Put(1l, 1l, event);
Event returnEvent = EventUtils.getEventFromTransactionEvent(put);
Assert.assertNotNull(returnEvent);
Assert.assertEquals(5, returnEvent.getBody().length);
}

@Test
public void testInvalidEvent() {
Take take = new Take(1l, 1l);
Event returnEvent = EventUtils.getEventFromTransactionEvent(take);
Assert.assertNull(returnEvent);
}

}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flume.tools;

import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;

/**
* Event Validator interface to be used for validating Events
* per custom logic
*/
public interface EventValidator {

/**
* Validate the Event in a application specific manner
*
* @param event Flume Event
* @return true if Event is valid as per App Logic
*/
boolean validateEvent(Event event);

EventValidator NOOP_VALIDATOR = new EventValidator() {
@Override
public boolean validateEvent(Event event) {
return true;
}
};

interface Builder extends Configurable {
EventValidator build();
}
}
@@ -22,15 +22,21 @@
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.file.CorruptEventException;
import org.apache.flume.channel.file.EventUtils;
import org.apache.flume.channel.file.Log;
import org.apache.flume.channel.file.LogFile;
import org.apache.flume.channel.file.LogFileV3;
import org.apache.flume.channel.file.LogRecord;
import org.apache.flume.channel.file.Serialization;
import org.apache.flume.channel.file.TransactionEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -39,13 +45,24 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class FileChannelIntegrityTool implements FlumeTool {
public static final Logger LOG = LoggerFactory.getLogger
(FileChannelIntegrityTool.class);

private final List<File> dataDirs = new ArrayList<File>();

private EventValidator eventValidator = EventValidator.NOOP_VALIDATOR;

private long totalPutEvents;
private long invalidEvents;
private long eventsWithException;
private long corruptEvents;
private long validEvents;
private long totalChannelEvents;

@Override
public void run(String[] args) throws IOException, ParseException {
boolean shouldContinue = parseCommandLineOpts(args);
@@ -85,12 +102,39 @@ public boolean accept(File dir, String name) {
// this will throw a CorruptEventException - so the real logic
// is in the catch block below.
LogRecord record = reader.next();
totalChannelEvents++;
if (record != null) {
record.getEvent();
TransactionEventRecord recordEvent = record.getEvent();
Event event = EventUtils.getEventFromTransactionEvent(recordEvent);
if(event != null) {
totalPutEvents++;
try {
if (!eventValidator.validateEvent(event)) {
if (!fileBackedup) {
Serialization.copyFile(dataFile, new File(dataFile.getParent(),
dataFile.getName() + ".bak"));
fileBackedup = true;
}
invalidEvents++;
updater.markRecordAsNoop(eventPosition);
} else {
validEvents++;
}
} catch (Exception e) {
// OOPS, didn't expected an exception
// considering as failure case
// marking as noop
System.err.println("Encountered Exception while validating event, marking as invalid");
updater.markRecordAsNoop(eventPosition);
eventsWithException++;
}
}
} else {
fileDone = true;
}
} catch (CorruptEventException e) {
corruptEvents++;
totalChannelEvents++;
LOG.warn("Corruption found in " + dataFile.toString() + " at "
+ eventPosition);
if (!fileBackedup) {
@@ -106,14 +150,25 @@ public boolean accept(File dir, String name) {
}
}
}
printSummary();
}

private boolean parseCommandLineOpts(String[] args) throws ParseException {
Options options = new Options();
options
.addOption("l", "dataDirs", true, "Comma-separated list of data " +
"directories which the tool must verify. This option is mandatory")
.addOption("h", "help", false, "Display help");
.addOption("h", "help", false, "Display help")
.addOption("e", "eventValidator", true, "Fully Qualified Name of Event Validator Implementation");;


Option property = OptionBuilder.withArgName("property=value")
.hasArgs(2)
.withValueSeparator()
.withDescription( "custom properties" )
.create( "D" );

options.addOption(property);

CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
@@ -137,6 +192,46 @@ private boolean parseCommandLineOpts(String[] args) throws ParseException {
dataDirs.add(f);
}
}

if(commandLine.hasOption("eventValidator")) {
try {
Class<? extends EventValidator.Builder> eventValidatorClassName =
(Class<? extends EventValidator.Builder>)Class.forName(
commandLine.getOptionValue("eventValidator"));
EventValidator.Builder eventValidatorBuilder = eventValidatorClassName.newInstance();

// Pass on the configuration parameter
Properties systemProperties = commandLine.getOptionProperties("D");
Context context = new Context();

Set<String> keys = systemProperties.stringPropertyNames();
for (String key : keys) {
context.put(key, systemProperties.getProperty(key));
}
eventValidatorBuilder.configure(context);
eventValidator = eventValidatorBuilder.build();
} catch (Exception e) {
System.err.println(String.format("Could find class %s in lib folder",
commandLine.getOptionValue("eventValidator")));
e.printStackTrace();
return false;
}
}
return true;
}

/**
* Prints the summary of run. Following information is printed
*
*/
private void printSummary() {
System.out.println("---------- Summary --------------------");
System.out.println("Number of Events in the Channel = "+totalChannelEvents++);
System.out.println("Number of Put Events Processed = "+totalPutEvents);
System.out.println("Number of Valid Put Events = "+validEvents);
System.out.println("Number of Invalid Put Events = "+invalidEvents);
System.out.println("Number of Put Events that threw Exception during validation = "+eventsWithException);
System.out.println("Number of Corrupt Events = "+corruptEvents);
System.out.println("---------------------------------------");
}
}
@@ -58,6 +58,7 @@ public class TestFileChannelIntegrityTool {
private File checkpointDir;
private File dataDir;

private static int invalidEvent = 0;

@BeforeClass
public static void setUpClass() throws Exception{
@@ -96,6 +97,45 @@ public static void tearDownClass() throws Exception {
FileUtils.deleteDirectory(origDataDir);
}

@Test
public void testFixInvalidRecords() throws Exception {
doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName());
}
@Test
public void testFixInvalidRecordsWithCheckpoint() throws Exception {
doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName());
}

public void doTestFixInvalidEvents(boolean withCheckpoint, String eventHandler) throws Exception {
FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"});
FileChannel channel = new FileChannel();
channel.setName("channel");
String cp;
if(withCheckpoint) {
cp = origCheckpointDir.toString();
} else {
FileUtils.deleteDirectory(checkpointDir);
Assert.assertTrue(checkpointDir.mkdirs());
cp = checkpointDir.toString();
}
ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
channel.configure(ctx);
channel.start();
Transaction tx = channel.getTransaction();
tx.begin();
int i = 0;
while(channel.take() != null) {
i++;
}
tx.commit();
tx.close();
channel.stop();
Assert.assertTrue(invalidEvent != 0);
Assert.assertEquals(25 - invalidEvent, i);
}

@Test
public void testFixCorruptRecords() throws Exception {
doTestFixCorruptEvents(false);
@@ -226,6 +266,12 @@ private static void createDataFiles() throws Exception {
Transaction tx = channel.getTransaction();
tx.begin();
for (int i = 0; i < 5; i++) {
if(i % 3 == 0) {
event.getBody()[0] = 0;
invalidEvent++;
} else {
event.getBody()[0] = 1;
}
channel.put(event);
}
tx.commit();
@@ -244,4 +290,33 @@ private static void createDataFiles() throws Exception {
.invoke(true));
channel.stop();
}

public static class DummyEventVerifier implements EventValidator {

private int value = 0;

private DummyEventVerifier(int val) {
value = val;
}

@Override
public boolean validateEvent(Event event) {
return event.getBody()[0] != value;
}

public static class Builder implements EventValidator.Builder {

private int binaryValidator = 0;

@Override
public EventValidator build() {
return new DummyEventVerifier(binaryValidator);
}

@Override
public void configure(Context context) {
binaryValidator = context.getInteger("validatorValue");
}
}
}
}

0 comments on commit 91ec579

Please sign in to comment.