Skip to content

Commit

Permalink
[pulseaudio] Small bugfixes and rewrite (openhab#12581)
Browse files Browse the repository at this point in the history
* [pulseaudio] small fixes and rewrite

- All classes are now @NonNullByDefault
- all build warnings cleared
- no more need for a watchdog scheduled thread for every pulseaudio device : the bridge now handles sending information to child
- fix bug : exception at startup when child handler try to get information from the bridge too soon is now handled by waiting 2 seconds if necessary
- fix bug : playing MP3 with high bitrate is now OK with the replacement of the ResetableInputStream by a standard BufferedInputStream that handle mark/reset method better
- fix bug : ghost device listener no longer receive event after dispose
- fix bug : discovery doesn't show already added thing anymore
- Updating the status bridge to ONLINE only AFTER the update method is done.
- Use the bridgeStatusChanged method in the childhandler to get opportunity to test if the child could go ONLINE, (and by the way initialize the audiosink and audiosource, has they also need information from the bridge)

Signed-off-by: Gwendal Roulleau <gwendal.roulleau@gmail.com>
Co-authored-by: Laurent Garnier <lg.hc@free.fr>
  • Loading branch information
dalgwen and lolodomo authored Apr 9, 2022
1 parent f65ee6c commit 2be9a65
Show file tree
Hide file tree
Showing 19 changed files with 344 additions and 449 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.openhab.binding.pulseaudio.internal;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
Expand Down Expand Up @@ -55,14 +56,13 @@ public class ConvertedInputStream extends InputStream {

public ConvertedInputStream(AudioStream innerInputStream)
throws UnsupportedAudioFormatException, UnsupportedAudioFileException, IOException {

this.audioFormat = innerInputStream.getFormat();

if (innerInputStream instanceof FixedLengthAudioStream) {
length = ((FixedLengthAudioStream) innerInputStream).length();
}

pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new ResetableInputStream(innerInputStream)));
pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new BufferedInputStream(innerInputStream)));
}

@Override
Expand Down Expand Up @@ -108,7 +108,6 @@ public void close() throws IOException {
* @return A PCM normalized stream (2 channel, 44100hz, 16 bit signed)
*/
private AudioInputStream getPCMStreamNormalized(AudioInputStream pcmInputStream) {

javax.sound.sampled.AudioFormat format = pcmInputStream.getFormat();
if (format.getChannels() != 2
|| !format.getEncoding().equals(javax.sound.sampled.AudioFormat.Encoding.PCM_SIGNED)
Expand Down Expand Up @@ -138,7 +137,6 @@ public long getDuration() {
*/
private AudioInputStream getPCMStream(InputStream resetableInnerInputStream)
throws UnsupportedAudioFileException, IOException, UnsupportedAudioFormatException {

if (AudioFormat.MP3.isCompatible(audioFormat)) {
MpegAudioFileReader mpegAudioFileReader = new MpegAudioFileReader();

Expand Down Expand Up @@ -170,7 +168,6 @@ private AudioInputStream getPCMStream(InputStream resetableInnerInputStream)
sourceFormat.getChannels(), sourceFormat.getChannels() * 2, sourceFormat.getSampleRate(), false);

return mpegconverter.getAudioInputStream(convertFormat, sourceAIS);

} else if (AudioFormat.WAV.isCompatible(audioFormat)) {
// return the same input stream, but try to compute the duration first
AudioInputStream audioInputStream = AudioSystem.getAudioInputStream(resetableInnerInputStream);
Expand All @@ -187,71 +184,4 @@ private AudioInputStream getPCMStream(InputStream resetableInnerInputStream)
audioFormat);
}
}

/**
* This class add reset capability (on the first bytes only)
* to an AudioStream. This is necessary for the parsing / format detection.
*
*/
public static class ResetableInputStream extends InputStream {

private static final int BUFFER_LENGTH = 10000;

private final InputStream originalInputStream;

private int position = -1;
private int markPosition = -1;
private int maxPreviousPosition = -2;

private byte[] startingBuffer = new byte[BUFFER_LENGTH + 1];

public ResetableInputStream(InputStream originalInputStream) {
this.originalInputStream = originalInputStream;
}

@Override
public void close() throws IOException {
originalInputStream.close();
}

@Override
public int read() throws IOException {
if (position >= BUFFER_LENGTH || originalInputStream.markSupported()) {
return originalInputStream.read();
} else {
position++;
if (position <= maxPreviousPosition) {
return Byte.toUnsignedInt(startingBuffer[position]);
} else {
int currentByte = originalInputStream.read();
startingBuffer[position] = (byte) currentByte;
maxPreviousPosition = position;
return currentByte;
}
}
}

@Override
public synchronized void mark(int readlimit) {
if (originalInputStream.markSupported()) {
originalInputStream.mark(readlimit);
}
markPosition = position;
}

@Override
public boolean markSupported() {
return true;
}

@Override
public synchronized void reset() throws IOException {
if (originalInputStream.markSupported()) {
originalInputStream.reset();
} else if (position >= BUFFER_LENGTH) {
throw new IOException("mark/reset not supported above " + BUFFER_LENGTH + " bytes");
}
position = markPosition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ public void process(@Nullable AudioStream audioStream)
} catch (IOException e) {
disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
if (countAttempt == 2) { // we won't retry : log and quit
String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
final Socket clientSocketLocal = clientSocket;
String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
: "unknown";
logger.warn(
"Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ public void close() throws IOException {
} catch (IOException e) {
disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
if (countAttempt == 2) { // we won't retry : log and quit
String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
final Socket clientSocketLocal = clientSocket;
String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
: "unknown";
logger.warn(
"Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
Expand Down Expand Up @@ -153,11 +155,14 @@ private synchronized void startPipeWrite() {
if (pipeOutputs.contains(output)) {
output.flush();
}
} catch (IOException e) {
if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) {
} catch (InterruptedIOException e) {
if (pipeOutputs.isEmpty()) {
// task has been ended while writing
return;
}
logger.warn("InterruptedIOException while writing to from pulse source pipe: {}",
getExceptionMessage(e));
} catch (IOException e) {
logger.warn("IOException while writing to from pulse source pipe: {}",
getExceptionMessage(e));
} catch (RuntimeException e) {
Expand Down Expand Up @@ -221,7 +226,8 @@ private synchronized void stopPipeWriteTask() {
} catch (IOException | InterruptedException ignored) {
}
try {
return (clientSocket != null) ? clientSocket.getInputStream() : null;
var clientSocketFinal = clientSocket;
return (clientSocketFinal != null) ? clientSocketFinal.getInputStream() : null;
} catch (IOException ignored) {
return null;
}
Expand Down Expand Up @@ -264,11 +270,14 @@ public int read() throws IOException {

@Override
public int read(byte @Nullable [] b) throws IOException {
return read(b, 0, b.length);
return read(b, 0, b == null ? 0 : b.length);
}

@Override
public int read(byte @Nullable [] b, int off, int len) throws IOException {
if (b == null) {
throw new IOException("Buffer is null");
}
logger.trace("reading from pulseaudio stream");
if (closed) {
throw new IOException("Stream is closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Optional;
import java.util.Random;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.pulseaudio.internal.cli.Parser;
Expand Down Expand Up @@ -150,7 +149,6 @@ public synchronized void update() {
modules = new ArrayList<Module>(Parser.parseModules(listModules()));

List<AbstractAudioDeviceConfig> newItems = new ArrayList<>(); // prepare new list before assigning it
newItems.clear();
if (configuration.sink) {
logger.debug("reading sinks");
newItems.addAll(Parser.parseSinks(listSinks(), this));
Expand Down Expand Up @@ -245,48 +243,6 @@ public void sendCommand(String command) {
return null;
}

/**
* retrieves a {@link SinkInput} by its name
*
* @return the corresponding {@link SinkInput} to the given <code>name</code>
*/
public @Nullable SinkInput getSinkInput(String name) {
for (AbstractAudioDeviceConfig item : items) {
if (item.getPaName().equalsIgnoreCase(name) && item instanceof SinkInput) {
return (SinkInput) item;
}
}
return null;
}

/**
* retrieves a {@link SinkInput} by its id
*
* @return the corresponding {@link SinkInput} to the given <code>id</code>
*/
public @Nullable SinkInput getSinkInput(int id) {
for (AbstractAudioDeviceConfig item : items) {
if (item.getId() == id && item instanceof SinkInput) {
return (SinkInput) item;
}
}
return null;
}

/**
* retrieves a {@link Source} by its name
*
* @return the corresponding {@link Source} to the given <code>name</code>
*/
public @Nullable Source getSource(String name) {
for (AbstractAudioDeviceConfig item : items) {
if (item.getPaName().equalsIgnoreCase(name) && item instanceof Source) {
return (Source) item;
}
}
return null;
}

/**
* retrieves a {@link Source} by its id
*
Expand All @@ -301,34 +257,6 @@ public void sendCommand(String command) {
return null;
}

/**
* retrieves a {@link SourceOutput} by its name
*
* @return the corresponding {@link SourceOutput} to the given <code>name</code>
*/
public @Nullable SourceOutput getSourceOutput(String name) {
for (AbstractAudioDeviceConfig item : items) {
if (item.getPaName().equalsIgnoreCase(name) && item instanceof SourceOutput) {
return (SourceOutput) item;
}
}
return null;
}

/**
* retrieves a {@link SourceOutput} by its id
*
* @return the corresponding {@link SourceOutput} to the given <code>id</code>
*/
public @Nullable SourceOutput getSourceOutput(int id) {
for (AbstractAudioDeviceConfig item : items) {
if (item.getId() == id && item instanceof SourceOutput) {
return (SourceOutput) item;
}
}
return null;
}

/**
* retrieves a {@link AbstractAudioDeviceConfig} by its name
*
Expand All @@ -343,6 +271,11 @@ public void sendCommand(String command) {
return null;
}

/**
* Get all items previously parsed from the pulseaudio server.
*
* @return All items parsed from the pulseaudio server
*/
public List<AbstractAudioDeviceConfig> getItems() {
return items;
}
Expand Down Expand Up @@ -479,16 +412,18 @@ private Optional<Integer> findSimpleProtocolTcpModule(AbstractAudioDeviceConfig
.map(portS -> Integer.parseInt(portS));
}

private Optional<@NonNull String> extractArgumentFromLine(String argumentWanted, String argumentLine) {
private Optional<String> extractArgumentFromLine(String argumentWanted, @Nullable String argumentLine) {
String argument = null;
int startPortIndex = argumentLine.indexOf(argumentWanted + "=");
if (startPortIndex != -1) {
startPortIndex = startPortIndex + argumentWanted.length() + 1;
int endPortIndex = argumentLine.indexOf(" ", startPortIndex);
if (endPortIndex == -1) {
endPortIndex = argumentLine.length();
if (argumentLine != null) {
int startPortIndex = argumentLine.indexOf(argumentWanted + "=");
if (startPortIndex != -1) {
startPortIndex = startPortIndex + argumentWanted.length() + 1;
int endPortIndex = argumentLine.indexOf(" ", startPortIndex);
if (endPortIndex == -1) {
endPortIndex = argumentLine.length();
}
argument = argumentLine.substring(startPortIndex, endPortIndex);
}
argument = argumentLine.substring(startPortIndex, endPortIndex);
}
return Optional.ofNullable(argument);
}
Expand Down Expand Up @@ -552,7 +487,10 @@ public void setCombinedSinkSlaves(@Nullable Sink combinedSink, List<Sink> sinks)
slaves.add(sink.getPaName());
}
// 1. delete old combined-sink
sendRawCommand(CMD_UNLOAD_MODULE + " " + combinedSink.getModule().getId());
Module lastModule = combinedSink.getModule();
if (lastModule != null) {
sendRawCommand(CMD_UNLOAD_MODULE + " " + lastModule.getId());
}
// 2. add new combined-sink with same name and all slaves
sendRawCommand(CMD_LOAD_MODULE + " " + MODULE_COMBINE_SINK + " sink_name=" + combinedSink.getPaName()
+ " slaves=" + String.join(",", slaves));
Expand Down Expand Up @@ -731,8 +669,9 @@ public void connect() throws IOException {
if (clientSocket == null || clientSocket.isClosed() || !clientSocket.isConnected()) {
logger.trace("Try to connect...");
try {
client = new Socket(host, port);
client.setSoTimeout(500);
var clientFinal = new Socket(host, port);
clientFinal.setSoTimeout(500);
client = clientFinal;
logger.trace("connected");
} catch (UnknownHostException e) {
client = null;
Expand Down
Loading

0 comments on commit 2be9a65

Please sign in to comment.