Skip to content

Commit

Permalink
FLUME-2151. Windows: Update TestExecSource to use native commands on …
Browse files Browse the repository at this point in the history
…Windows

(Roshan Naik via Hari Shreedharan)
  • Loading branch information
harishreedharan committed Mar 12, 2014
1 parent 96b090b commit 1f21df7
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 49 deletions.
171 changes: 122 additions & 49 deletions flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
Expand Up @@ -34,6 +34,8 @@
import javax.management.ObjectName;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
Expand Down Expand Up @@ -94,20 +96,38 @@ public void tearDown() {
public void testProcess() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {

context.put("command", "cat /etc/passwd");
// Generates a random files for input\output
File inputFile = File.createTempFile("input", null);
File ouputFile = File.createTempFile("ouput", null);
FileUtils.forceDeleteOnExit(inputFile);
FileUtils.forceDeleteOnExit(ouputFile);

// Generates input file with a random data set (10 lines, 200 characters each)
FileOutputStream outputStream1 = new FileOutputStream(inputFile);
for (int i=0; i<10; i++) {
outputStream1.write(
RandomStringUtils.randomAlphanumeric(200).getBytes());
outputStream1.write('\n');
}
outputStream1.close();

String command = SystemUtils.IS_OS_WINDOWS ?
String.format("cmd /c type %s", inputFile.getAbsolutePath()) :
String.format("cat %s", inputFile.getAbsolutePath());
context.put("command", command);
context.put("keep-alive", "1");
context.put("capacity", "1000");
context.put("transactionCapacity", "1000");
Configurables.configure(source, context);

source.start();
Thread.sleep(2000);
Transaction transaction = channel.getTransaction();

transaction.begin();
Event event;

FileOutputStream outputStream = new FileOutputStream(
"/tmp/flume-execsource." + Thread.currentThread().getId());
FileOutputStream outputStream = new FileOutputStream(ouputFile);

while ((event = channel.take()) != null) {
outputStream.write(event.getBody());
Expand All @@ -118,69 +138,101 @@ public void testProcess() throws InterruptedException, LifecycleException,
transaction.commit();
transaction.close();

File file1 = new File("/tmp/flume-execsource."
+ Thread.currentThread().getId());
File file2 = new File("/etc/passwd");
Assert.assertEquals(FileUtils.checksumCRC32(file1),
FileUtils.checksumCRC32(file2));
FileUtils.forceDelete(file1);
Assert.assertEquals(FileUtils.checksumCRC32(inputFile),
FileUtils.checksumCRC32(ouputFile));
}

@Test
public void testShellCommandSimple() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
runTestShellCmdHelper("/bin/bash -c", "seq 5"
, new String[]{"1","2","3","4","5" } );
EventDeliveryException, IOException {
if (SystemUtils.IS_OS_WINDOWS) {
runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command",
"1..5", new String[]{"1", "2", "3", "4", "5"});
} else {
runTestShellCmdHelper("/bin/bash -c", "seq 5",
new String[]{"1", "2", "3", "4", "5"});
}
}

@Test
public void testShellCommandBackTicks() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
public void testShellCommandBackTicks()
throws InterruptedException, LifecycleException, EventDeliveryException,
IOException {
// command with backticks
runTestShellCmdHelper("/bin/bash -c", "echo `seq 5`" , new String[]{"1 2 3 4 5" } );
runTestShellCmdHelper("/bin/bash -c", "echo $(seq 5)" , new String[]{"1 2 3 4 5" } );
if (SystemUtils.IS_OS_WINDOWS) {
runTestShellCmdHelper(
"powershell -ExecutionPolicy Unrestricted -command", "$(1..5)",
new String[]{"1", "2", "3", "4", "5"});
} else {
runTestShellCmdHelper("/bin/bash -c", "echo `seq 5`",
new String[]{"1 2 3 4 5"});
runTestShellCmdHelper("/bin/bash -c", "echo $(seq 5)",
new String[]{"1 2 3 4 5"});
}
}

@Test
public void testShellCommandComplex() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
public void testShellCommandComplex()
throws InterruptedException, LifecycleException, EventDeliveryException,
IOException {
// command with wildcards & pipes
String[] expected = {"1234", "abcd", "ijk", "xyz", "zzz"};

// pipes
runTestShellCmdHelper("/bin/bash -c", "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f"
, expected );
if (SystemUtils.IS_OS_WINDOWS) {
runTestShellCmdHelper(
"powershell -ExecutionPolicy Unrestricted -command",
"'zzz','1234','xyz','abcd','ijk' | sort", expected);
} else {
runTestShellCmdHelper("/bin/bash -c",
"echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f", expected);
}
}

@Test
public void testShellCommandScript() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
public void testShellCommandScript()
throws InterruptedException, LifecycleException, EventDeliveryException,
IOException {
// mini script
runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done"
, new String[]{"1","2","3","4","5" } );
// shell arithmetic
runTestShellCmdHelper("/bin/bash -c", "if ((2+2>3)); then echo good; else echo not good; fi" , new String[]{"good"} );
if (SystemUtils.IS_OS_WINDOWS) {
runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command",
"foreach ($i in 1..5) { $i }", new String[]{"1", "2", "3", "4", "5"});
// shell arithmetic
runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command",
"if(2+2 -gt 3) { 'good' } else { 'not good' } ", new String[]{"good"});
} else {
runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done"
, new String[]{"1", "2", "3", "4", "5"});
// shell arithmetic
runTestShellCmdHelper("/bin/bash -c", "if ((2+2>3)); " +
"then echo good; else echo not good; fi", new String[]{"good"});
}
}

@Test
public void testShellCommandEmbeddingAndEscaping() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
System.out.println( "######### PWD = " + new java.io.File( "." ).getCanonicalPath() );
public void testShellCommandEmbeddingAndEscaping()
throws InterruptedException, LifecycleException, EventDeliveryException,
IOException {
// mini script
BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/test_command.txt") );
String fileName = SystemUtils.IS_OS_WINDOWS ?
"src\\test\\resources\\test_command.ps1" :
"src/test/resources/test_command.txt";
BufferedReader reader = new BufferedReader(new FileReader(fileName));
try {
String shell = SystemUtils.IS_OS_WINDOWS ?
"powershell -ExecutionPolicy Unrestricted -command" :
"/bin/bash -c";
String command1 = reader.readLine();
Assert.assertNotNull(command1);
String[] output1 = new String[] {"'1'", "\"2\"", "\\3", "\\4"};
runTestShellCmdHelper("/bin/bash -c", command1 , output1);
runTestShellCmdHelper( shell, command1 , output1);
String command2 = reader.readLine();
Assert.assertNotNull(command2);
String[] output2 = new String[]{"1","2","3","4","5" };
runTestShellCmdHelper("/bin/bash -c", command2 , output2);
runTestShellCmdHelper(shell, command2 , output2);
String command3 = reader.readLine();
Assert.assertNotNull(command3);
String[] output3 = new String[]{"2","3","4","5","6" };
runTestShellCmdHelper("/bin/bash -c", command3 , output3);
runTestShellCmdHelper(shell, command3 , output3);
} finally {
reader.close();
}
Expand All @@ -190,8 +242,14 @@ public void testShellCommandEmbeddingAndEscaping() throws InterruptedException,
public void testMonitoredCounterGroup() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
// mini script
runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done"
, new String[]{"1","2","3","4","5" } );
if (SystemUtils.IS_OS_WINDOWS) {
runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command",
"foreach ($i in 1..5) { $i }"
, new String[]{"1", "2", "3", "4", "5"});
} else {
runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done"
, new String[]{"1", "2", "3", "4", "5"});
}

ObjectName objName = null;

Expand Down Expand Up @@ -237,8 +295,13 @@ public void testBatchTimeout() throws InterruptedException, LifecycleException,

context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, "50000");
context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "750");
context.put("shell", "/bin/bash -c");
context.put("command", "tail -f " + filePath);
context.put("shell", SystemUtils.IS_OS_WINDOWS ?
"powershell -ExecutionPolicy Unrestricted -command" :
"/bin/bash -c");
context.put("command", SystemUtils.IS_OS_WINDOWS ?
"Get-Content " + filePath +
" | Select-Object -Last 10" :
("tail -f " + filePath));

Configurables.configure(source, context);
source.start();
Expand Down Expand Up @@ -277,6 +340,8 @@ private void runTestShellCmdHelper(String shell, String command, String[] expect
source.start();
File outputFile = File.createTempFile("flumeExecSourceTest_", "");
FileOutputStream outputStream = new FileOutputStream(outputFile);
if(SystemUtils.IS_OS_WINDOWS)
Thread.sleep(2500);
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Expand All @@ -288,10 +353,10 @@ private void runTestShellCmdHelper(String shell, String command, String[] expect
outputStream.close();
transaction.commit();
List<String> output = Files.readLines(outputFile, Charset.defaultCharset());
System.out.println("command : " + command);
System.out.println("output : ");
for( String line : output )
System.out.println();
// System.out.println("command : " + command);
// System.out.println("output : ");
// for( String line : output )
// System.out.println(line);
Assert.assertArrayEquals(expectedOutput, output.toArray(new String[]{}));
} finally {
FileUtils.forceDelete(outputFile);
Expand All @@ -308,7 +373,8 @@ public void testRestart() throws InterruptedException, LifecycleException,
context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10");
context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true");

context.put("command", "echo flume");
context.put("command",
SystemUtils.IS_OS_WINDOWS ? "cmd /c echo flume" : "echo flume");
Configurables.configure(source, context);

source.start();
Expand Down Expand Up @@ -348,12 +414,17 @@ public void testShutdown() throws Exception {

// now find one that is not in use
boolean searchForCommand = true;
while(searchForCommand) {
while (searchForCommand) {
searchForCommand = false;
String command = "sleep " + seconds;
Pattern pattern = Pattern.compile("\b" + command + "\b");
for(String line : exec("ps -ef")) {
if(pattern.matcher(line).find()) {
String command = SystemUtils.IS_OS_WINDOWS ? ("cmd /c sleep " + seconds) :
("sleep " + seconds);
String searchTxt = SystemUtils.IS_OS_WINDOWS ? ("sleep.exe") :
("\b" + command + "\b");
Pattern pattern = Pattern.compile(searchTxt);
for (String line : exec(SystemUtils.IS_OS_WINDOWS ?
"cmd /c tasklist /FI \"SESSIONNAME eq Console\"" :
"ps -ef")) {
if (pattern.matcher(line).find()) {
seconds++;
searchForCommand = true;
break;
Expand All @@ -376,7 +447,9 @@ public void testShutdown() throws Exception {
Thread.sleep(1000L);
source.stop();
Thread.sleep(1000L);
for(String line : exec("ps -ef")) {
for (String line : exec(SystemUtils.IS_OS_WINDOWS ?
"cmd /c tasklist /FI \"SESSIONNAME eq Console\"" :
"ps -ef")) {
if(pattern.matcher(line).find()) {
Assert.fail("Found [" + line + "]");
}
Expand Down
3 changes: 3 additions & 0 deletions flume-ng-core/src/test/resources/test_command.ps1
@@ -0,0 +1,3 @@
" \"'1'\", \"\"\"2\"\"\",\"`\3\", \"\4\" "
foreach ($i in 1..5) { $i }
1..5 | %{$_ +1 }

0 comments on commit 1f21df7

Please sign in to comment.