Skip to content

Commit

Permalink
Split GlTextureProcessor.Listener into input/output/error listener.
Browse files Browse the repository at this point in the history
This simplifies ChainingGlTextureProcessor as it now only connects a
consuming and a producing GlTextureProcessor rather than a previous,
current, and next GlTextureProcessor.

Also use default no-op implementations of the listeners in
SingleFrameGlTextureProcessor and MediaPipeProcessor to avoid
null-checks.

PiperOrigin-RevId: 466301642
  • Loading branch information
Googler authored and marcbaechinger committed Oct 19, 2022
1 parent e444eaa commit a2166a4
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 168 deletions.
Expand Up @@ -31,7 +31,6 @@
import com.google.mediapipe.framework.TextureFrame;
import com.google.mediapipe.glutil.EglManager;
import java.util.concurrent.ConcurrentHashMap;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

/** Runs a MediaPipe graph on input frames. */
/* package */ final class MediaPipeProcessor implements GlTextureProcessor {
Expand All @@ -55,10 +54,13 @@ protected void loadLibrary(String name) {
}

private final FrameProcessor frameProcessor;
private volatile GlTextureProcessor.@MonotonicNonNull Listener listener;
private volatile boolean acceptedFrame;
private final ConcurrentHashMap<TextureInfo, TextureFrame> outputFrames;

private InputListener inputListener;
private OutputListener outputListener;
private ErrorListener errorListener;
private boolean acceptedFrame;

/**
* Creates a new texture processor that wraps a MediaPipe graph.
*
Expand All @@ -78,11 +80,27 @@ public MediaPipeProcessor(
checkState(LOADER.isAvailable());
// TODO(b/227624622): Confirm whether MediaPipeProcessor could support HDR colors.
checkArgument(!useHdr, "MediaPipeProcessor does not support HDR colors.");
inputListener = new InputListener() {};
outputListener = new OutputListener() {};
errorListener = (frameProcessingException) -> {};
EglManager eglManager = new EglManager(EGL14.eglGetCurrentContext());
frameProcessor =
new FrameProcessor(
context, eglManager.getNativeContext(), graphName, inputStreamName, outputStreamName);
outputFrames = new ConcurrentHashMap<>();
// OnWillAddFrameListener is called on the same thread as frameProcessor.onNewFrame(...), so no
// synchronization is needed for acceptedFrame.
frameProcessor.setOnWillAddFrameListener((long timestamp) -> acceptedFrame = true);
}

@Override
public void setInputListener(InputListener inputListener) {
this.inputListener = inputListener;
}

@Override
public void setOutputListener(OutputListener outputListener) {
this.outputListener = outputListener;
frameProcessor.setConsumer(
frame -> {
TextureInfo texture =
Expand All @@ -92,22 +110,15 @@ public MediaPipeProcessor(
frame.getWidth(),
frame.getHeight());
outputFrames.put(texture, frame);
if (listener != null) {
listener.onOutputFrameAvailable(texture, frame.getTimestamp());
}
});
frameProcessor.setAsynchronousErrorListener(
error -> {
if (listener != null) {
listener.onFrameProcessingError(new FrameProcessingException(error));
}
outputListener.onOutputFrameAvailable(texture, frame.getTimestamp());
});
frameProcessor.setOnWillAddFrameListener((long timestamp) -> acceptedFrame = true);
}

@Override
public void setListener(GlTextureProcessor.Listener listener) {
this.listener = listener;
public void setErrorListener(ErrorListener errorListener) {
this.errorListener = errorListener;
frameProcessor.setAsynchronousErrorListener(
error -> errorListener.onFrameProcessingError(new FrameProcessingException(error)));
}

@Override
Expand All @@ -123,13 +134,9 @@ public boolean maybeQueueInputFrame(TextureInfo inputTexture, long presentationT
appTextureFrame.waitUntilReleasedWithGpuSync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (listener != null) {
listener.onFrameProcessingError(new FrameProcessingException(e));
}
}
if (listener != null) {
listener.onInputFrameProcessed(inputTexture);
errorListener.onFrameProcessingError(new FrameProcessingException(e));
}
inputListener.onInputFrameProcessed(inputTexture);
return acceptedFrame;
}

Expand All @@ -146,8 +153,6 @@ public void release() {
@Override
public final void signalEndOfCurrentInputStream() {
frameProcessor.waitUntilIdle();
if (listener != null) {
listener.onCurrentOutputStreamEnded();
}
outputListener.onCurrentOutputStreamEnded();
}
}
Expand Up @@ -16,92 +16,75 @@
package androidx.media3.effect;

import android.util.Pair;
import androidx.annotation.Nullable;
import androidx.media3.common.FrameProcessingException;
import androidx.media3.common.FrameProcessor;
import androidx.media3.effect.GlTextureProcessor.InputListener;
import androidx.media3.effect.GlTextureProcessor.OutputListener;
import java.util.ArrayDeque;
import java.util.Queue;

/**
* A {@link GlTextureProcessor.Listener} that connects the {@link GlTextureProcessor} it is
* {@linkplain GlTextureProcessor#setListener(GlTextureProcessor.Listener) set} on to a previous and
* next {@link GlTextureProcessor}.
* Connects a producing and a consuming {@link GlTextureProcessor} instance.
*
* <p>This listener should be set as {@link InputListener} on the consuming {@link
* GlTextureProcessor} and as {@link OutputListener} on the producing {@link GlTextureProcessor}.
*/
/* package */ final class ChainingGlTextureProcessorListener
implements GlTextureProcessor.Listener {
implements GlTextureProcessor.InputListener, GlTextureProcessor.OutputListener {

@Nullable private final GlTextureProcessor previousGlTextureProcessor;
@Nullable private final GlTextureProcessor nextGlTextureProcessor;
private final GlTextureProcessor producingGlTextureProcessor;
private final GlTextureProcessor consumingGlTextureProcessor;
private final FrameProcessingTaskExecutor frameProcessingTaskExecutor;
private final FrameProcessor.Listener frameProcessorListener;
private final Queue<Pair<TextureInfo, Long>> pendingFrames;
private final Queue<Pair<TextureInfo, Long>> availableFrames;

/**
* Creates a new instance.
*
* @param previousGlTextureProcessor The {@link GlTextureProcessor} that comes before the {@link
* GlTextureProcessor} this listener is set on or {@code null} if not applicable.
* @param nextGlTextureProcessor The {@link GlTextureProcessor} that comes after the {@link
* GlTextureProcessor} this listener is set on or {@code null} if not applicable.
* @param producingGlTextureProcessor The {@link GlTextureProcessor} for which this listener will
* be set as {@link OutputListener}.
* @param consumingGlTextureProcessor The {@link GlTextureProcessor} for which this listener will
* be set as {@link InputListener}.
* @param frameProcessingTaskExecutor The {@link FrameProcessingTaskExecutor} that is used for
* OpenGL calls. All calls to the previous/next {@link GlTextureProcessor} will be executed by
* the {@link FrameProcessingTaskExecutor}. The caller is responsible for releasing the {@link
* FrameProcessingTaskExecutor}.
* @param frameProcessorListener The {@link FrameProcessor.Listener} to forward exceptions to.
* OpenGL calls. All calls to the producing/consuming {@link GlTextureProcessor} will be
* executed by the {@link FrameProcessingTaskExecutor}. The caller is responsible for
* releasing the {@link FrameProcessingTaskExecutor}.
*/
public ChainingGlTextureProcessorListener(
@Nullable GlTextureProcessor previousGlTextureProcessor,
@Nullable GlTextureProcessor nextGlTextureProcessor,
FrameProcessingTaskExecutor frameProcessingTaskExecutor,
FrameProcessor.Listener frameProcessorListener) {
this.previousGlTextureProcessor = previousGlTextureProcessor;
this.nextGlTextureProcessor = nextGlTextureProcessor;
GlTextureProcessor producingGlTextureProcessor,
GlTextureProcessor consumingGlTextureProcessor,
FrameProcessingTaskExecutor frameProcessingTaskExecutor) {
this.producingGlTextureProcessor = producingGlTextureProcessor;
this.consumingGlTextureProcessor = consumingGlTextureProcessor;
this.frameProcessingTaskExecutor = frameProcessingTaskExecutor;
this.frameProcessorListener = frameProcessorListener;
pendingFrames = new ArrayDeque<>();
availableFrames = new ArrayDeque<>();
}

@Override
public void onInputFrameProcessed(TextureInfo inputTexture) {
if (previousGlTextureProcessor != null) {
GlTextureProcessor nonNullPreviousGlTextureProcessor = previousGlTextureProcessor;
frameProcessingTaskExecutor.submit(
() -> nonNullPreviousGlTextureProcessor.releaseOutputFrame(inputTexture));
}
frameProcessingTaskExecutor.submit(
() -> producingGlTextureProcessor.releaseOutputFrame(inputTexture));
}

@Override
public void onOutputFrameAvailable(TextureInfo outputTexture, long presentationTimeUs) {
if (nextGlTextureProcessor != null) {
GlTextureProcessor nonNullNextGlTextureProcessor = nextGlTextureProcessor;
frameProcessingTaskExecutor.submit(
() -> {
pendingFrames.add(new Pair<>(outputTexture, presentationTimeUs));
processFrameNowOrLater(nonNullNextGlTextureProcessor);
});
}
frameProcessingTaskExecutor.submit(
() -> {
availableFrames.add(new Pair<>(outputTexture, presentationTimeUs));
processFrameNowOrLater();
});
}

private void processFrameNowOrLater(GlTextureProcessor nextGlTextureProcessor) {
Pair<TextureInfo, Long> pendingFrame = pendingFrames.element();
private void processFrameNowOrLater() {
Pair<TextureInfo, Long> pendingFrame = availableFrames.element();
TextureInfo outputTexture = pendingFrame.first;
long presentationTimeUs = pendingFrame.second;
if (nextGlTextureProcessor.maybeQueueInputFrame(outputTexture, presentationTimeUs)) {
pendingFrames.remove();
if (consumingGlTextureProcessor.maybeQueueInputFrame(outputTexture, presentationTimeUs)) {
availableFrames.remove();
} else {
frameProcessingTaskExecutor.submit(() -> processFrameNowOrLater(nextGlTextureProcessor));
frameProcessingTaskExecutor.submit(this::processFrameNowOrLater);
}
}

@Override
public void onCurrentOutputStreamEnded() {
if (nextGlTextureProcessor != null) {
frameProcessingTaskExecutor.submit(nextGlTextureProcessor::signalEndOfCurrentInputStream);
}
}

@Override
public void onFrameProcessingError(FrameProcessingException e) {
frameProcessorListener.onFrameProcessingError(e);
frameProcessingTaskExecutor.submit(consumingGlTextureProcessor::signalEndOfCurrentInputStream);
}
}
Expand Up @@ -78,7 +78,7 @@
private int inputHeight;
@Nullable private MatrixTransformationProcessor matrixTransformationProcessor;
@Nullable private SurfaceViewWrapper debugSurfaceViewWrapper;
private @MonotonicNonNull Listener listener;
private InputListener inputListener;
private @MonotonicNonNull Pair<Integer, Integer> outputSizeBeforeSurfaceTransformation;
@Nullable private SurfaceView debugSurfaceView;

Expand Down Expand Up @@ -113,18 +113,24 @@ public FinalMatrixTransformationProcessorWrapper(
textureTransformMatrix = new float[16];
Matrix.setIdentityM(textureTransformMatrix, /* smOffset= */ 0);
streamOffsetUsQueue = new ArrayDeque<>();
inputListener = new InputListener() {};
}

@Override
public void setInputListener(InputListener inputListener) {
this.inputListener = inputListener;
}

/**
* {@inheritDoc}
*
* <p>The {@code FinalMatrixTransformationProcessorWrapper} will only call {@link
* Listener#onInputFrameProcessed(TextureInfo)}. Other events are handled via the {@link
* FrameProcessor.Listener} passed to the constructor.
*/
@Override
public void setListener(Listener listener) {
this.listener = listener;
public void setOutputListener(OutputListener outputListener) {
// The FrameProcessor.Listener passed to the constructor is used for output-related events.
throw new UnsupportedOperationException();
}

@Override
public void setErrorListener(ErrorListener errorListener) {
// The FrameProcessor.Listener passed to the constructor is used for errors.
throw new UnsupportedOperationException();
}

@Override
Expand Down Expand Up @@ -174,9 +180,7 @@ public boolean maybeQueueInputFrame(TextureInfo inputTexture, long presentationT
Log.d(TAG, "Error rendering to debug preview", e);
}
}
if (listener != null) {
listener.onInputFrameProcessed(inputTexture);
}
inputListener.onInputFrameProcessed(inputTexture);
return true;
}

Expand Down Expand Up @@ -278,8 +282,8 @@ private MatrixTransformationProcessor createMatrixTransformationProcessorForOutp

@Override
public void releaseOutputFrame(TextureInfo outputTexture) {
throw new UnsupportedOperationException(
"The final texture processor writes to a surface so there is no texture to release");
// The final texture processor writes to a surface so there is no texture to release.
throw new UnsupportedOperationException();
}

@Override
Expand Down
Expand Up @@ -217,26 +217,21 @@ private static ImmutableList<GlTextureProcessor> getGlTextureProcessorsForGlEffe
private static void chainTextureProcessorsWithListeners(
ImmutableList<GlTextureProcessor> textureProcessors,
FrameProcessingTaskExecutor frameProcessingTaskExecutor,
FrameProcessor.Listener listener) {
for (int i = 0; i < textureProcessors.size(); i++) {
@Nullable
GlTextureProcessor previousGlTextureProcessor =
i - 1 >= 0 ? textureProcessors.get(i - 1) : null;
@Nullable
GlTextureProcessor nextGlTextureProcessor =
i + 1 < textureProcessors.size() ? textureProcessors.get(i + 1) : null;
textureProcessors
.get(i)
.setListener(
new ChainingGlTextureProcessorListener(
previousGlTextureProcessor,
nextGlTextureProcessor,
frameProcessingTaskExecutor,
listener));
FrameProcessor.Listener frameProcessorListener) {
for (int i = 0; i < textureProcessors.size() - 1; i++) {
GlTextureProcessor producingGlTextureProcessor = textureProcessors.get(i);
GlTextureProcessor consumingGlTextureProcessor = textureProcessors.get(i + 1);
ChainingGlTextureProcessorListener chainingGlTextureProcessorListener =
new ChainingGlTextureProcessorListener(
producingGlTextureProcessor,
consumingGlTextureProcessor,
frameProcessingTaskExecutor);
producingGlTextureProcessor.setOutputListener(chainingGlTextureProcessorListener);
producingGlTextureProcessor.setErrorListener(frameProcessorListener::onFrameProcessingError);
consumingGlTextureProcessor.setInputListener(chainingGlTextureProcessorListener);
}
}

private static final String TAG = "GlEffectsFrameProcessor";
private static final String THREAD_NAME = "Transformer:GlEffectsFrameProcessor";
private static final long RELEASE_WAIT_TIME_MS = 100;

Expand Down

0 comments on commit a2166a4

Please sign in to comment.