Skip to content

Pipeline abstraction#48

Merged
ruccho merged 7 commits intomainfrom
feature/pipeline_abstraction
Oct 15, 2025
Merged

Pipeline abstraction#48
ruccho merged 7 commits intomainfrom
feature/pipeline_abstraction

Conversation

@ruccho
Copy link
Collaborator

@ruccho ruccho commented Oct 6, 2025

New abstraction layer of encoding pipeline to implement pipelines such as #39 and #40, which are slightly different from current one.

Concept

With the pipeline abstraction, a pipeline is represented as a sequence of pipeline elements which implement IAsyncPipelineInput<T> or IPipelineInput<T>. This enhances the reusability of the code implementing the pipeline.

IAsyncPipelineInput<T>

Represents single element of encoding pipeline, which is mainly for data flow where back pressure occurs.

IPipelineInput<T> will receive values, manipulate them and pass them another IPipelineInput<T>.

internal interface IAsyncPipelineInput<in T> : IDisposable
{
    ValueTask PushAsync(T value);
    ValueTask CompleteAsync(Exception exception = null);
}

IPipelineInput<T>

A synchronous version of IAsyncPipelineInput<T>, which is mainly for data flow without back pressure.

internal interface IPipelineInput<in T> : IDisposable
{
    void Push(T value);
    ValueTask CompleteAsync(Exception exception = null);
}

IPipelineTransform<TIn, TOut>

Some pipeline elements may be able to be used in both asynchronous and synchronous flow. IPipelineTransform only requires simple value transform implementation and they can be converted to either IAsyncPipelineInput<T> or IPipelineInput<T> using PipelineExtensions.AsInput() and PipelineExtensions.AsAsyncInput().

internal interface IPipelineTransform<in TIn, TOut> : IDisposable
{
    bool Transform(TIn input, out TOut output);
}

Redesigning RealtimeInstantReplaySession

RealtimeInstantReplaySession is redesigned with the pipeline abstraction. The pipeline is wrote like:

_videoPipeline = new FrameProviderSubscription(frameProvider, disposeFrameProvider,
    new VideoTemporalAdjuster<IFrameProvider.Frame>(_temporalController, fixedFrameInterval).AsInput(
        new FramePreprocessorInput(preprocessor, true).AsInput(
            new AsyncGPUReadbackTransform().AsInput(
                new DroppingChannelInput<LazyVideoFrameData>(
                    options.VideoInputQueueSize,
                    onLazyVideoFrameDataDropped,
                    new VideoEncoderInput(videoEncoder,
                        new BoundedEncodedDataBufferVideoInput(buffer).AsAsync()))))));

_audioPipeline = new AudioSampleProviderSubscription(audioSampleProvider, disposeAudioSampleProvider,
    new AudioTemporalAdjuster(
        _temporalController,
        options.AudioOptions.SampleRate,
        options.AudioOptions.Channels).AsInput(
        new DroppingChannelInput<PcmAudioFrame>(options.AudioInputQueueSize, onPcmAudioFrameDropped,
            new AudioEncoderInput(audioEncoder, options.AudioOptions.SampleRate,
                new BoundedEncodedDataBufferAudioInput(buffer).AsAsync()))));

It has already been confirmed that General Unbounded Recording (#40) can be implemented with a similarly small amount of code with the pipeline abstraction. (PR).

Future works

Currently all pipeline abstraction APIs are private but I want to publish them, enabling users to create their own pipelines easily.

@ruccho ruccho marked this pull request as ready for review October 14, 2025 06:46
@ruccho ruccho requested review from hkmt-mmy and qua-iy October 14, 2025 06:46
{
try
{
try

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ネストが深いのが気になりました。内側のtry-finallyは関数に切り出してもいいかも?

_numChannelsInOption = numChannelsInOption;
}

public bool Transform(InputAudioFrame input, out PcmAudioFrame output)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

かなり長いので (もし良い感じの分割単位があれば) 分割したほうが見通しが良さそうだと思いました


public void Push(EncodedFrame value)
{
if (!_buffer.TryAddAudioFrame(value))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1行の場合に{}付けてないのにここは付いてます

{
Unregister();

if (_disposeProvider)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

こことかUnregisterのところも{}ついてるけど、基本的に付けないルールでやってそうなので気になりました

public void Dispose()
{
if (_array != null)
ArrayPool<short>.Shared.Return(_array);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

借りた当人じゃないのにReturnするのが少し気になりました。コンストラクタ引数でrendとしてるしいいのかな……?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

この配列はタスク間を非同期的にたらい回されたあとに解放されるので、残念ながらこれについては解決策がないです


private async Task TransferAsync(IAsyncPipelineInput<EncodedFrame> next)
{
try

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ネストがめっちゃ深い……。処理自体は少ないけど、private関数なりローカル関数なりに切り出したほうが良さそう

private readonly IRecordingTimeProvider _recordingTimeProvider;
private bool _disposed;
private double _prevFrameTime;
private readonly double? _fixedFrameInterval;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readonlyがついててコンストラクタ初期化されるフィールドがまとまってないのが気になりました。好みの問題かもですが

@ruccho ruccho merged commit 1c157eb into main Oct 15, 2025
@ruccho ruccho deleted the feature/pipeline_abstraction branch October 15, 2025 05:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants