Skip to content

Commit

Permalink
Issue #213: Separated bind and accept calls; Added option to setup a …
Browse files Browse the repository at this point in the history
…callback after bind has been called (also usable in EmbeddedDBusDaemon)
  • Loading branch information
hypfvieh committed Oct 20, 2023
1 parent 284de30 commit 6463b16
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 68 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,31 @@ With dbus-java 4.x (and 5.x as well), java.nio is used for all transports and th

### Note for custom transports
If you previously used a custom transport you have to update your code when switching to dbus-java 5.x.
The `AbstractTransport` base class has been changed and now provides two different methods to separate client and listening
The `AbstractTransport` base class has been changed and now provides different methods to separate client and listening
(server) connections.

The `connectImpl()` was previously existing and will now only be called for client side connections.
The new method `listenImpl()` is used for server connections and has been added in dbus-java 5.x.
The new methods `bindImpl()`, `acceptImpl()` and `isBound()` are used for server connections and has been added in dbus-java 5.x.

The reason to provide separate methods was to allow bootstrapping server connections before accepting connections.
In the old implementation `accept()` was usually called in `connectImpl()` and therefore blocked the method until
a client was connected. This blocked setting up the server side before the first client was connecting.

This forced the user to use some random sleep times to wait for the server setup after first client connects.
With the separation no more sleep waits are required.

With the new methods, everything related to setup the server socket should be done in `bindImpl()` including the binding
of the listening socket (calling `bind()` on the server socket). Everything done in this method should not block.
You mustn't call `accept()` on your server socket in `bindImpl()`!

In `acceptImpl()` it is expected that the transport calls `accept()` on its server socket and therefore this method will block.
It must return the `SocketChannel` for each client connected (like `connectImpl()` does).

The `isBound()` method must return the bind status of the server socket. This means for example
server socket is not `null` and server socket is opened.
This method is used by `AbstractTransport` to determine if `bindImpl()` was called before and if the server socket is ready to accept
connections.


### How to use file descriptors?
In DBus-Java version below < 4.3.1 file descriptor usage was not supported out of the box and required a third party libary (see below).
Starting with version 4.3.1 file descriptors are supported when using junixsocket-transport.
Expand Down Expand Up @@ -94,9 +106,9 @@ The library will remain open source and MIT licensed and can still be used, fork
#### Changes

##### Changes in 5.0.0 (not released yet):
- **Updated minimum required Java version to 17**
- Removed all classes and methods marked as deprecated in 4.x
- Updated dependencies and maven plugins
- Updated minimum required Java version to 17
- Improved handling of listening connections to allow proper bootstrapping the connection before actually starting accepting new connections (thanks to [brett-smith](https://github.com/brett-smith) ([#213](https://github.com/hypfvieh/dbus-java/issues/213)))
- Updated export-object documentation ([#236](https://github.com/hypfvieh/dbus-java/issues/236))
- Fixed issues with autoConnect option, added method to register to bus by 'Hello' message manually, thanks to [brett-smith](https://github.com/brett-smith) ([#238](https://github.com/hypfvieh/dbus-java/issues/238))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
import org.freedesktop.dbus.exceptions.AuthenticationException;
import org.freedesktop.dbus.exceptions.DBusException;
import org.freedesktop.dbus.exceptions.SocketClosedException;
import org.freedesktop.dbus.utils.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.attribute.PosixFilePermission;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* Simple DBusDaemon implementation to use if no DBusDaemon is running on the OS level.
Expand All @@ -30,7 +32,7 @@ public class EmbeddedDBusDaemon implements Closeable {
private DBusDaemon daemon;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean connectionReady = new AtomicBoolean(false);
// private final AtomicBoolean connectionReady = new AtomicBoolean(false);

private SaslAuthMode saslAuthMode;

Expand All @@ -40,6 +42,11 @@ public class EmbeddedDBusDaemon implements Closeable {

private PosixFilePermission[] unixSocketFilePermissions;

private Consumer<AbstractTransport> connectCallback;
private Consumer<AbstractTransport> bindCallback;

private CountDownLatch startupLatch = new CountDownLatch(1);

public EmbeddedDBusDaemon(BusAddress _address) {
// create copy of address so manipulation happens later does not interfere with our instance
address = BusAddress.of(Objects.requireNonNull(_address, "Address required"));
Expand All @@ -55,7 +62,7 @@ public EmbeddedDBusDaemon(String _address) throws DBusException {
@Override
public synchronized void close() throws IOException {
closed.set(true);
connectionReady.set(false);
startupLatch = new CountDownLatch(1);
if (daemon != null) {
daemon.close();
try {
Expand Down Expand Up @@ -102,13 +109,37 @@ public void startInBackground() {
* Starts the DBusDaemon in background.
* <p>
* Will wait up to the given period of milliseconds for the background thread to get ready.
*
* @param _maxWaitMillis maximum wait time in milliseconds
* @throws IllegalStateException when interrupted or wait for daemon timed out
*/
public void startInBackgroundAndWait(long _maxWaitMillis) throws IllegalStateException {
startInBackground();
try {
startupLatch.await(_maxWaitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException _ex) {
throw new IllegalStateException("Daemon not started after " + _maxWaitMillis + " milliseconds");
}
}

/**
* Starts the DBusDaemon in background.
* <p>
* Waits until the daemon is ready before return indefinitely.
* If given wait time exceeded, a {@link RuntimeException} is thrown.
*
* @param _maxWaitMillis maximum wait time in milliseconds
* @throws IllegalStateException when interrupted while waiting for daemon to start
*
* @since 5.0.0 - 2023-10-20
*/
public void startInBackgroundAndWait(long _maxWaitMillis) {
public void startInBackgroundAndWait() throws IllegalStateException {
startInBackground();
Util.waitFor("EmbeddedDbusDaemon", this::isRunning, _maxWaitMillis, 100);
try {
startupLatch.await();
} catch (InterruptedException _ex) {
throw new IllegalStateException("Interrupted while waiting for daemon to start");
}
}

/**
Expand All @@ -117,7 +148,7 @@ public void startInBackgroundAndWait(long _maxWaitMillis) {
* @return true if running, false otherwise
*/
public synchronized boolean isRunning() {
return connectionReady.get() && daemon != null && daemon.isRunning();
return startupLatch.getCount() == 0 && daemon != null && daemon.isRunning();
}

/**
Expand Down Expand Up @@ -179,6 +210,41 @@ public void setUnixSocketPermissions(PosixFilePermission... _permissions) {
unixSocketFilePermissions = _permissions;
}

/**
* Configured pre-connect callback.
* @return Consumer or null
*/
public Consumer<AbstractTransport> getConnectCallback() {
return connectCallback;
}

/**
* Callback which will be called by transport right before the socket is bound and connections will be accepted.
*
* @param _connectCallback callback or null to disable
*/
public void setConnectCallback(Consumer<AbstractTransport> _connectCallback) {
connectCallback = _connectCallback;
}

/**
* Configured bind callback.
* @return Consumer or null
*/
public Consumer<AbstractTransport> getBindCallback() {
return bindCallback;
}

/**
* Callback which will be called by transport right after the server socket was bound.<br>
* Server will not yet accept connections at this point, but it started listening on the configured address.
*
* @param _callback
*/
public void setBindCallback(Consumer<AbstractTransport> _callback) {
bindCallback = _callback;
}

private synchronized void setDaemonAndStart(AbstractTransport _transport) {
daemon = new DBusDaemon(_transport);
daemon.start();
Expand All @@ -202,6 +268,13 @@ private void startListening() throws IOException, DBusException {
.withUnixSocketFileOwner(unixSocketFileOwner)
.withUnixSocketFileGroup(unixSocketFileGroup)
.withUnixSocketFilePermissions(unixSocketFilePermissions)
.withPreConnectCallback(connectCallback)
.withAfterBindCallback(x -> {
if (bindCallback != null) {
bindCallback.accept(x);
}
startupLatch.countDown();
})
.withAutoConnect(false)
.configureSasl().withAuthMode(getSaslAuthMode()).back()
.back()
Expand All @@ -213,7 +286,6 @@ private void startListening() throws IOException, DBusException {
do {
try {
LOGGER.debug("Begin listening to: {}", transport);
connectionReady.set(true);
TransportConnection s = transport.listen();
daemon.addSock(s);
} catch (AuthenticationException _ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class TransportConfig {
private BusAddress busAddress;

private Consumer<AbstractTransport> preConnectCallback;
private Consumer<AbstractTransport> afterBindCallback;

private int timeout = 10000;
private boolean autoConnect = true;
Expand Down Expand Up @@ -77,6 +78,14 @@ public void setPreConnectCallback(Consumer<AbstractTransport> _preConnectCallbac
preConnectCallback = _preConnectCallback;
}

public Consumer<AbstractTransport> getAfterBindCallback() {
return afterBindCallback;
}

public void setAfterBindCallback(Consumer<AbstractTransport> _afterBindCallback) {
afterBindCallback = _afterBindCallback;
}

public boolean isAutoConnect() {
return autoConnect;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ public X withPreConnectCallback(Consumer<AbstractTransport> _callback) {
return self();
}

/**
* Set a callback which will be called after {@code bindImpl()} on a server connection was called.<br>
* This method is only called if the transport is configured as server connection.
* <p>
* The given consumer will receive the created {@link AbstractTransport} object which is not yet
* accepting connections. A callback should <b>NEVER</b> call accept on the transport, but is allowed to do further
* configuration if needed.
* </p>
*
* @param _callback consumer to call, null to remove any callback
*
* @return this
* @since 5.0.0 - 2023-10-20
*/
public X withAfterBindCallback(Consumer<AbstractTransport> _callback) {
config.setPreConnectCallback(_callback);
return self();
}

/**
* Instantly connect to DBus when {@link #build()} is called.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -110,22 +111,53 @@ public synchronized boolean isConnected() {
protected abstract boolean hasFileDescriptorSupport();

/**
* Abstract method implemented by concrete sub classes to establish a connection using whatever transport type (e.g.
* TCP/Unix socket).
* Abstract method implemented by concrete sub classes to establish a connection.
* @return socket channel connected to DBus server
*
* @throws IOException when connection fails
*/
protected abstract SocketChannel connectImpl() throws IOException;

/**
* Abstract method implemented by concrete sub classes to listen for a new connection using whatever transport type
* (e.g. TCP/Unix socket).
* Method to accept new incoming listening connections.<br>
* This is the place where {@code accept()} is called on the server socket created by {@link #bindImpl()}.<br>
* Therefore this method will block until a client is connected.
*
* @return newly connected client socket
*
* @throws IOException when connection fails
*
* @since 5.0.0 - 2023-10-20
*/
protected abstract SocketChannel acceptImpl() throws IOException;

/**
* Method called to prepare listening for connections.<br>
* This is usually the place where the {@code ServerSocketChannel} is created and {@code bind()} is called.
*
* @throws IOException when connection fails
*
* @since 5.0.0 - 2023-05-18
* @since 5.0.0 - 2023-10-20
*/
protected abstract SocketChannel listenImpl() throws IOException;
protected abstract void bindImpl() throws IOException;

/**
* Method which is called to close a transport.<br>
* Should be used to close all sockets and/or serversockets.
*
* @throws IOException when something fails while closing transport
* @since 5.0.0 - 2023-10-20
*/
protected abstract void closeTransport() throws IOException;

/**
* Status of the server socket if this transport is configured to be a server connection.<br>
* Must be false if {@link #bindImpl()} was not called.
*
* @return boolean
* @since 5.0.0 - 2023-10-20
*/
protected abstract boolean isBound();

/**
* Establish connection on created transport.<br>
Expand Down Expand Up @@ -173,7 +205,13 @@ public final TransportConnection listen() throws IOException {
if (!getAddress().isListeningSocket()) {
throw new InvalidBusAddressException("Cannot listen on client connection address (try use connect() instead)");
}
transportConnection = internalConnect(() -> listenImpl());

if (!isBound()) {
bindImpl();
runCallback(config.getAfterBindCallback());
}

transportConnection = internalConnect(() -> acceptImpl());
return transportConnection;
}

Expand All @@ -185,9 +223,7 @@ public final TransportConnection listen() throws IOException {
* @throws IOException when channel provider could not create a SocketChannel
*/
private TransportConnection internalConnect(IThrowingSupplier<SocketChannel, IOException> _channelProvider) throws IOException {
if (config.getPreConnectCallback() != null) {
config.getPreConnectCallback().accept(this);
}
runCallback(config.getPreConnectCallback());
SocketChannel channel = _channelProvider.get();

authenticate(channel);
Expand Down Expand Up @@ -266,6 +302,14 @@ private TransportConnection createInputOutput(SocketChannel _socket) {
return new TransportConnection(messageFactory, _socket, providerImpl, writer, reader);
}

/**
* Runs a callback if not null.
* @param _callback callback to execute
*/
private void runCallback(Consumer<AbstractTransport> _callback) {
Optional.ofNullable(_callback).ifPresent(c -> c.accept(this));
}

/**
* Returns the {@link BusAddress} used for this transport.
*
Expand Down Expand Up @@ -340,11 +384,14 @@ public String toString() {
}

@Override
public void close() throws IOException {
public final void close() throws IOException {
if (transportConnection != null) {
transportConnection.close();
transportConnection = null;
}

getLogger().debug("Disconnecting Transport: {}", this);
closeTransport();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ public void testStartAndConnectEmbeddedDBusDaemon() throws DBusException {

logger.debug("Starting embedded bus on address {})", listenBusAddress);
try (EmbeddedDBusDaemon daemon = new EmbeddedDBusDaemon(listenBusAddress)) {
daemon.startInBackground();
logger.debug("Started embedded bus on address {}", listenBusAddress);

waitForDaemon(daemon);
daemon.startInBackgroundAndWait(MAX_WAIT);

// connect to started daemon process
logger.info("Connecting to embedded DBus {}", busAddress);
Expand Down
Loading

0 comments on commit 6463b16

Please sign in to comment.