Skip to content

Commit

Permalink
Merge pull request #2 from Col-E/socketIPC
Browse files Browse the repository at this point in the history
[Surefire-1516] Improve IPC performace with Sockets
  • Loading branch information
jon-bell committed Oct 27, 2019
2 parents f7d4310 + 965c46e commit 26ba4d8
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.maven.plugin.surefire.report.DefaultReporterFactory;
import org.apache.maven.shared.utils.cli.CommandLineCallable;
import org.apache.maven.shared.utils.cli.CommandLineException;
import org.apache.maven.shared.utils.cli.CommandLineUtils;
import org.apache.maven.shared.utils.cli.Commandline;
import org.apache.maven.shared.utils.cli.StreamConsumer;
import org.apache.maven.surefire.booter.AbstractPathConfiguration;
import org.apache.maven.surefire.booter.KeyValueSource;
import org.apache.maven.surefire.booter.PropertiesWrapper;
Expand All @@ -51,9 +54,15 @@
import org.apache.maven.surefire.util.DefaultScanResult;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -86,16 +95,14 @@
import static org.apache.maven.plugin.surefire.SurefireHelper.replaceForkThreadsInPath;
import static org.apache.maven.plugin.surefire.booterclient.ForkNumberBucket.drawNumber;
import static org.apache.maven.plugin.surefire.booterclient.ForkNumberBucket.returnNumber;
import static org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream
.TestLessInputStreamBuilder;
import static org.apache.maven.shared.utils.cli.CommandLineUtils.executeCommandLineAsCallable;
import static org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream.TestLessInputStreamBuilder;
import static org.apache.maven.shared.utils.cli.ShutdownHookUtils.addShutDownHook;
import static org.apache.maven.shared.utils.cli.ShutdownHookUtils.removeShutdownHook;
import static org.apache.maven.surefire.booter.SystemPropertyManager.writePropertiesFile;
import static org.apache.maven.surefire.cli.CommandLineOption.SHOW_ERRORS;
import static org.apache.maven.surefire.suite.RunResult.SUCCESS;
import static org.apache.maven.surefire.suite.RunResult.failure;
import static org.apache.maven.surefire.suite.RunResult.timeout;
import static org.apache.maven.surefire.suite.RunResult.failure;
import static org.apache.maven.surefire.util.internal.ConcurrencyUtils.countDownToZero;
import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
Expand Down Expand Up @@ -417,7 +424,7 @@ private RunResult runSuitesForkPerTestSet( final SurefireProperties effectiveSys
int failFastCount = providerConfiguration.getSkipAfterFailureCount();
final AtomicInteger notifyStreamsToSkipTestsJustNow = new AtomicInteger( failFastCount );
final AtomicBoolean printedErrorStream = new AtomicBoolean();
for ( final Object testSet : getSuitesIterator() )
for ( final Class<?> testSet : getSuitesIterator() )
{
Callable<RunResult> pf = new Callable<RunResult>()
{
Expand Down Expand Up @@ -540,7 +547,7 @@ private void closeExecutor( ExecutorService executorService )
}
}

private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkClient forkClient,
private RunResult fork( Class<?> testSet, KeyValueSource providerProperties, ForkClient forkClient,
SurefireProperties effectiveSystemProperties, int forkNumber,
AbstractForkInputStream testProvidingInputStream, boolean readTestsFromInStream )
throws SurefireBooterForkException
Expand Down Expand Up @@ -585,13 +592,33 @@ private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkC
{
testProvidingInputStream.setFlushReceiverProvider( cli );
}

// setup server
ServerSocket serverSocket;
try
{
// auto-assign port randomly
serverSocket = new ServerSocket( 0 );
}
catch ( IOException e )
{
throw new IllegalStateException( e );
}
//
// pass arguments to booter
//
// index-sensitive arguments
cli.createArg().setValue( tempDir );
cli.createArg().setValue( DUMP_FILE_PREFIX + forkNumber );
cli.createArg().setValue( surefireProperties.getName() );
cli.createArg().setValue( String.valueOf( serverSocket.getLocalPort() ) );
// optional arguments
if ( systPropsFile != null )
{
cli.createArg().setValue( systPropsFile.getName() );
cli.createArg().setValue( "-props:" + systPropsFile.getName() );
}
if ( testSet != null && !forkConfiguration.isReuseForks() && forkConfiguration.getForkCount() == 1 )
{
cli.createArg().setValue( "-testClass:" + testSet.getName() );
}

final ThreadedStreamConsumer threadedStreamConsumer = new ThreadedStreamConsumer( forkClient );
Expand All @@ -609,8 +636,8 @@ private RunResult fork( Object testSet, KeyValueSource providerProperties, ForkC
new NativeStdErrStreamConsumer( forkClient.getDefaultReporterFactory() );

CommandLineCallable future =
executeCommandLineAsCallable( cli, testProvidingInputStream, threadedStreamConsumer,
stdErrConsumer, 0, closer, ISO_8859_1 );
executeCommandLineAsCallableWithSocketWrapping( cli, testProvidingInputStream,
threadedStreamConsumer, stdErrConsumer, 0, serverSocket, closer, ISO_8859_1 );

currentForkClients.add( forkClient );

Expand Down Expand Up @@ -819,4 +846,126 @@ public void run()
}
}, 0, TIMEOUT_CHECK_PERIOD_MILLIS, MILLISECONDS );
}

/**
* Wrapper call to allow redirection of the ForkedBooter's process's streams to the socket ICP streams.<br>
*
* <b>NOTE:</b> A more proper approach would be allowing "CommandLineUtils.executeCommandLineAsCallable" to pass
* the desired input/output streams.
*/
public CommandLineCallable executeCommandLineAsCallableWithSocketWrapping( @Nonnull final Commandline cl,
@Nullable final InputStream systemIn,
final StreamConsumer systemOut,
final StreamConsumer systemErr,
final int timeoutInSeconds,
final ServerSocket serverSocket,
@Nullable final Runnable runAfterProcessTermination,
@Nullable final Charset streamCharset )
throws CommandLineException
{
Commandline wrappingCommandLine = new Commandline()
{
@Override
public Process execute() throws CommandLineException
{
try
{
return new ProcessSocketHook( cl.execute(), serverSocket );
}
catch ( IOException e )
{
throw new IllegalStateException( "Failed to hook process for ICP" );
}
}
};
CommandLineCallable clc = CommandLineUtils.executeCommandLineAsCallable( wrappingCommandLine, systemIn,
systemOut, systemErr, timeoutInSeconds, runAfterProcessTermination, streamCharset );
try
{
serverSocket.close();
}
catch ( Exception e )
{
// Well, we tried!
}
return clc;
}

/**
* Process wrapper to allow redirection of the streams to a socket's streams.
*/
private static class ProcessSocketHook extends Process
{
private final Process wrapped;
private final Socket clientSocket;

ProcessSocketHook( Process wrapped, ServerSocket serverSocket ) throws IOException
{
this.wrapped = wrapped;
this.clientSocket = serverSocket.accept();
clientSocket.setTcpNoDelay( true );
}

@Override
public OutputStream getOutputStream()
{
try
{
// Hooked stream for ICP communication
return clientSocket.getOutputStream();
}
catch ( IOException e )
{
throw new IllegalStateException( "Failed to hook OutputStream:out" );
}
}

@Override
public InputStream getInputStream()
{
try
{
// Hooked stream for ICP communication
return clientSocket.getInputStream();
}
catch ( IOException e )
{
throw new IllegalStateException( "Failed to hook InputStream" );
}
}

@Override
public InputStream getErrorStream()
{
// Cannot share socket with output/input.
// However, due to the infrequently of usage this doesn't warrant conversion to sockets.
return wrapped.getErrorStream();
}

@Override
public int waitFor() throws InterruptedException
{
return wrapped.waitFor();
}

@Override
public int exitValue()
{
return wrapped.exitValue();
}

@Override
public void destroy()
{
wrapped.destroy();
try
{
clientSocket.close();
}
catch ( IOException e )
{
throw new IllegalStateException( "Failed to close process ICP socket" );
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
Expand Down Expand Up @@ -62,7 +64,7 @@ public final class CommandReader
{
private static final String LAST_TEST_SYMBOL = "";

private static final CommandReader READER = new CommandReader();
private static CommandReader reader;

private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();

Expand All @@ -82,13 +84,34 @@ public final class CommandReader

private volatile ConsoleLogger logger = new NullConsoleLogger();

private CommandReader()
private InputStream inputStream;

public CommandReader( InputStream inputStream )
{
this.inputStream = inputStream;
reader = this;
}

public static CommandReader getReader( Socket socket ) throws IOException
{
// initialize if needed
if ( reader == null )
{
reader = new CommandReader( socket.getInputStream() );
}
return getReader();
}


public static CommandReader getReader()
{
final CommandReader reader = READER;
// initialize if needed
if ( reader == null )
{
reader = new CommandReader( System.in );
}
// get and start
CommandReader reader = CommandReader.reader;
if ( reader.state.compareAndSet( NEW, RUNNABLE ) )
{
reader.commandThread.start();
Expand Down Expand Up @@ -374,10 +397,10 @@ private final class CommandRunnable
public void run()
{
CommandReader.this.startMonitor.countDown();
DataInputStream stdIn = new DataInputStream( System.in );
boolean isTestSetFinished = false;
try
{
DataInputStream stdIn = new DataInputStream( inputStream );
while ( CommandReader.this.state.get() == RUNNABLE )
{
Command command = decode( stdIn );
Expand Down
Loading

0 comments on commit 26ba4d8

Please sign in to comment.