diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java index 54f71a1516..289c2d2ce3 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java @@ -34,6 +34,8 @@ import javax.management.ObjectName; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.SystemUtils; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -94,20 +96,38 @@ public void tearDown() { public void testProcess() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { - context.put("command", "cat /etc/passwd"); + // Generates a random files for input\output + File inputFile = File.createTempFile("input", null); + File ouputFile = File.createTempFile("ouput", null); + FileUtils.forceDeleteOnExit(inputFile); + FileUtils.forceDeleteOnExit(ouputFile); + + // Generates input file with a random data set (10 lines, 200 characters each) + FileOutputStream outputStream1 = new FileOutputStream(inputFile); + for (int i=0; i<10; i++) { + outputStream1.write( + RandomStringUtils.randomAlphanumeric(200).getBytes()); + outputStream1.write('\n'); + } + outputStream1.close(); + + String command = SystemUtils.IS_OS_WINDOWS ? + String.format("cmd /c type %s", inputFile.getAbsolutePath()) : + String.format("cat %s", inputFile.getAbsolutePath()); + context.put("command", command); context.put("keep-alive", "1"); context.put("capacity", "1000"); context.put("transactionCapacity", "1000"); Configurables.configure(source, context); source.start(); + Thread.sleep(2000); Transaction transaction = channel.getTransaction(); transaction.begin(); Event event; - FileOutputStream outputStream = new FileOutputStream( - "/tmp/flume-execsource." + Thread.currentThread().getId()); + FileOutputStream outputStream = new FileOutputStream(ouputFile); while ((event = channel.take()) != null) { outputStream.write(event.getBody()); @@ -118,69 +138,101 @@ public void testProcess() throws InterruptedException, LifecycleException, transaction.commit(); transaction.close(); - File file1 = new File("/tmp/flume-execsource." - + Thread.currentThread().getId()); - File file2 = new File("/etc/passwd"); - Assert.assertEquals(FileUtils.checksumCRC32(file1), - FileUtils.checksumCRC32(file2)); - FileUtils.forceDelete(file1); + Assert.assertEquals(FileUtils.checksumCRC32(inputFile), + FileUtils.checksumCRC32(ouputFile)); } @Test public void testShellCommandSimple() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { - runTestShellCmdHelper("/bin/bash -c", "seq 5" - , new String[]{"1","2","3","4","5" } ); + EventDeliveryException, IOException { + if (SystemUtils.IS_OS_WINDOWS) { + runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", + "1..5", new String[]{"1", "2", "3", "4", "5"}); + } else { + runTestShellCmdHelper("/bin/bash -c", "seq 5", + new String[]{"1", "2", "3", "4", "5"}); + } } @Test - public void testShellCommandBackTicks() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + public void testShellCommandBackTicks() + throws InterruptedException, LifecycleException, EventDeliveryException, + IOException { // command with backticks - runTestShellCmdHelper("/bin/bash -c", "echo `seq 5`" , new String[]{"1 2 3 4 5" } ); - runTestShellCmdHelper("/bin/bash -c", "echo $(seq 5)" , new String[]{"1 2 3 4 5" } ); + if (SystemUtils.IS_OS_WINDOWS) { + runTestShellCmdHelper( + "powershell -ExecutionPolicy Unrestricted -command", "$(1..5)", + new String[]{"1", "2", "3", "4", "5"}); + } else { + runTestShellCmdHelper("/bin/bash -c", "echo `seq 5`", + new String[]{"1 2 3 4 5"}); + runTestShellCmdHelper("/bin/bash -c", "echo $(seq 5)", + new String[]{"1 2 3 4 5"}); + } } @Test - public void testShellCommandComplex() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + public void testShellCommandComplex() + throws InterruptedException, LifecycleException, EventDeliveryException, + IOException { // command with wildcards & pipes String[] expected = {"1234", "abcd", "ijk", "xyz", "zzz"}; - // pipes - runTestShellCmdHelper("/bin/bash -c", "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f" - , expected ); + if (SystemUtils.IS_OS_WINDOWS) { + runTestShellCmdHelper( + "powershell -ExecutionPolicy Unrestricted -command", + "'zzz','1234','xyz','abcd','ijk' | sort", expected); + } else { + runTestShellCmdHelper("/bin/bash -c", + "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f", expected); + } } @Test - public void testShellCommandScript() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + public void testShellCommandScript() + throws InterruptedException, LifecycleException, EventDeliveryException, + IOException { // mini script - runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" - , new String[]{"1","2","3","4","5" } ); - // shell arithmetic - runTestShellCmdHelper("/bin/bash -c", "if ((2+2>3)); then echo good; else echo not good; fi" , new String[]{"good"} ); + if (SystemUtils.IS_OS_WINDOWS) { + runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", + "foreach ($i in 1..5) { $i }", new String[]{"1", "2", "3", "4", "5"}); + // shell arithmetic + runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", + "if(2+2 -gt 3) { 'good' } else { 'not good' } ", new String[]{"good"}); + } else { + runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" + , new String[]{"1", "2", "3", "4", "5"}); + // shell arithmetic + runTestShellCmdHelper("/bin/bash -c", "if ((2+2>3)); " + + "then echo good; else echo not good; fi", new String[]{"good"}); + } } @Test - public void testShellCommandEmbeddingAndEscaping() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { - System.out.println( "######### PWD = " + new java.io.File( "." ).getCanonicalPath() ); + public void testShellCommandEmbeddingAndEscaping() + throws InterruptedException, LifecycleException, EventDeliveryException, + IOException { // mini script - BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/test_command.txt") ); + String fileName = SystemUtils.IS_OS_WINDOWS ? + "src\\test\\resources\\test_command.ps1" : + "src/test/resources/test_command.txt"; + BufferedReader reader = new BufferedReader(new FileReader(fileName)); try { + String shell = SystemUtils.IS_OS_WINDOWS ? + "powershell -ExecutionPolicy Unrestricted -command" : + "/bin/bash -c"; String command1 = reader.readLine(); Assert.assertNotNull(command1); String[] output1 = new String[] {"'1'", "\"2\"", "\\3", "\\4"}; - runTestShellCmdHelper("/bin/bash -c", command1 , output1); + runTestShellCmdHelper( shell, command1 , output1); String command2 = reader.readLine(); Assert.assertNotNull(command2); String[] output2 = new String[]{"1","2","3","4","5" }; - runTestShellCmdHelper("/bin/bash -c", command2 , output2); + runTestShellCmdHelper(shell, command2 , output2); String command3 = reader.readLine(); Assert.assertNotNull(command3); String[] output3 = new String[]{"2","3","4","5","6" }; - runTestShellCmdHelper("/bin/bash -c", command3 , output3); + runTestShellCmdHelper(shell, command3 , output3); } finally { reader.close(); } @@ -190,8 +242,14 @@ public void testShellCommandEmbeddingAndEscaping() throws InterruptedException, public void testMonitoredCounterGroup() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { // mini script - runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" - , new String[]{"1","2","3","4","5" } ); + if (SystemUtils.IS_OS_WINDOWS) { + runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", + "foreach ($i in 1..5) { $i }" + , new String[]{"1", "2", "3", "4", "5"}); + } else { + runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" + , new String[]{"1", "2", "3", "4", "5"}); + } ObjectName objName = null; @@ -237,8 +295,13 @@ public void testBatchTimeout() throws InterruptedException, LifecycleException, context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, "50000"); context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "750"); - context.put("shell", "/bin/bash -c"); - context.put("command", "tail -f " + filePath); + context.put("shell", SystemUtils.IS_OS_WINDOWS ? + "powershell -ExecutionPolicy Unrestricted -command" : + "/bin/bash -c"); + context.put("command", SystemUtils.IS_OS_WINDOWS ? + "Get-Content " + filePath + + " | Select-Object -Last 10" : + ("tail -f " + filePath)); Configurables.configure(source, context); source.start(); @@ -277,6 +340,8 @@ private void runTestShellCmdHelper(String shell, String command, String[] expect source.start(); File outputFile = File.createTempFile("flumeExecSourceTest_", ""); FileOutputStream outputStream = new FileOutputStream(outputFile); + if(SystemUtils.IS_OS_WINDOWS) + Thread.sleep(2500); Transaction transaction = channel.getTransaction(); transaction.begin(); try { @@ -288,10 +353,10 @@ private void runTestShellCmdHelper(String shell, String command, String[] expect outputStream.close(); transaction.commit(); List output = Files.readLines(outputFile, Charset.defaultCharset()); - System.out.println("command : " + command); - System.out.println("output : "); - for( String line : output ) - System.out.println(); +// System.out.println("command : " + command); +// System.out.println("output : "); +// for( String line : output ) +// System.out.println(line); Assert.assertArrayEquals(expectedOutput, output.toArray(new String[]{})); } finally { FileUtils.forceDelete(outputFile); @@ -308,7 +373,8 @@ public void testRestart() throws InterruptedException, LifecycleException, context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10"); context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true"); - context.put("command", "echo flume"); + context.put("command", + SystemUtils.IS_OS_WINDOWS ? "cmd /c echo flume" : "echo flume"); Configurables.configure(source, context); source.start(); @@ -348,12 +414,17 @@ public void testShutdown() throws Exception { // now find one that is not in use boolean searchForCommand = true; - while(searchForCommand) { + while (searchForCommand) { searchForCommand = false; - String command = "sleep " + seconds; - Pattern pattern = Pattern.compile("\b" + command + "\b"); - for(String line : exec("ps -ef")) { - if(pattern.matcher(line).find()) { + String command = SystemUtils.IS_OS_WINDOWS ? ("cmd /c sleep " + seconds) : + ("sleep " + seconds); + String searchTxt = SystemUtils.IS_OS_WINDOWS ? ("sleep.exe") : + ("\b" + command + "\b"); + Pattern pattern = Pattern.compile(searchTxt); + for (String line : exec(SystemUtils.IS_OS_WINDOWS ? + "cmd /c tasklist /FI \"SESSIONNAME eq Console\"" : + "ps -ef")) { + if (pattern.matcher(line).find()) { seconds++; searchForCommand = true; break; @@ -376,7 +447,9 @@ public void testShutdown() throws Exception { Thread.sleep(1000L); source.stop(); Thread.sleep(1000L); - for(String line : exec("ps -ef")) { + for (String line : exec(SystemUtils.IS_OS_WINDOWS ? + "cmd /c tasklist /FI \"SESSIONNAME eq Console\"" : + "ps -ef")) { if(pattern.matcher(line).find()) { Assert.fail("Found [" + line + "]"); } diff --git a/flume-ng-core/src/test/resources/test_command.ps1 b/flume-ng-core/src/test/resources/test_command.ps1 new file mode 100644 index 0000000000..4fbd820f70 --- /dev/null +++ b/flume-ng-core/src/test/resources/test_command.ps1 @@ -0,0 +1,3 @@ +" \"'1'\", \"\"\"2\"\"\",\"`\3\", \"\4\" " +foreach ($i in 1..5) { $i } +1..5 | %{$_ +1 } \ No newline at end of file