Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a new Evt after applying filter/operator #57

Closed
barthuijgen opened this issue Apr 24, 2023 · 14 comments
Closed

Create a new Evt after applying filter/operator #57

barthuijgen opened this issue Apr 24, 2023 · 14 comments

Comments

@barthuijgen
Copy link

I'm using Evt together with Pothos graphql subscriptions, where the subscribe method accepts an async iterator, which Evt instances are, great!

Except now I'm trying to apply a filter or transform for the subscription, so I would have

const pingEvent = new Evt<string | null>(null);
setInterval(() => pingEvent.post(`pong ${Date.now()}`), 1000);

and the schema

const schema = (builder: typeof builderType) => {
  builder.subscriptionField("ping", (t) =>
    t.string({
      resolve: (parent) => parent,
      subscribe: () => pingEvent,
    })
  );
};

This works decently, as soon as you subscribe you get the latest value from the stateful event, which is great. But what if the first state was not published yet? I don't want to return null on the graphql resolver.

I would like to be able to to something like

subscribe: () => pingEvent.attach(event => event !== null),

But I am not sure how to create a new instance of the Evt.

To create my own solution I attempted the helper method

export function applyEvtFilter<
  E extends Evt<any>,
  T extends (data: UnpackEvt<E>) => boolean
>(evt: E, operator: T) {
  const newEvt = new Evt<UnpackEvt<E>>();
  evt.attach(operator, (value) => newEvt.post(value));
  return newEvt;
}

and doing

subscribe: () => applyEvtFilter(pingEvent, event => event !== null))

And while this techincally works, it no longer unsubscribes, leading to eventually print

MaxHandlersExceededWarning: Possible Evt memory leak detected.26 handlers attached.

So, how am I able to apply operators to the event stream and return a new Evt that still properly unsubscribes?

@garronej
Copy link
Owner

Hold on

@barthuijgen
Copy link
Author

barthuijgen commented Apr 24, 2023

I've found .iter() seems to provide the functionality I'm looking for. But after some testing this has the same problem that handlers are not being cleaned up.

subscribe: function () {
    console.log(`handlers: ${pingEvent.getHandlers().length}`)
    const event = pingEvent.state
        ? pingEvent
        : pingEvent.toStateless();
    return event.iter(event => event === 'some filter');
}

the following shows increased handler count every new subscription. so the iterator.return() is not being hanled as exepcted when evt.iter() is used?

Edit: okay it is the. .toStateless() that prevent the handlers from being cleared. So if I avoid using that I can go ahead and filter with .iter().

I was getting the following error

Cannot pipe StatefulEvt because the operator does not match it's current state. You would end up with evt.state === undefined Use evt.toStateless([ctx]).pipe(op).toStatic(initialState) to be sure the StatefulEvt does not have an undefined state

Which is why I implemented the .toStateless() but I'll have to find another way to be able to filter without triggering this error.

@garronej
Copy link
Owner

garronej commented Apr 24, 2023

Hi @barthuijgen,

You are raising valid points. Evt needs something like a bidirectional pipes.

I offer you two solutions:

The naïve one (that work as long as Photos only subscribe at most one handler at the time):

import { Evt } from "evt";

const evtPing = Evt.create<string | null>(null);

const evtStatelessPing = (()=>{

    const ctx= Evt.newCtx();

    const evtStatelessPing = evtPing
        .toStateless(ctx)
        .pipe(ping => ping === null ? null : [ping]);

    evtStatelessPing.evtDetach.attachOnce(()=> ctx.done());

    if( evtPing.state !== null ){
        evtStatelessPing.postAsyncOnceHandled(evtPing.state);
    }

    return evtStatelessPing;

})()

The solution without making any assumption whatsoever.

import { Evt } from "evt";

const evtStatelessPing = (() => {

    const evtStatelessPing = Evt.create<string>();

    const ctx = Evt.newCtx();

    evtStatelessPing.evtAttach.attach(handler => {

        if (ctx.getHandlers().length === 0) {
            evtPing.$attach(
                ping => ping === null ? null : [ping],
                ctx,
                ping => evtStatelessPing.post(ping)
            );
            return;
        }

        if (evtPing.state === null) {
            return;
        }

        const op = evtStatelessPing.getInvocableOp(handler.op);

        const result = op(evtPing.state, () => { })

        if (result === null) {
            return;
        }

        handler.callback?.(result[0]);

    });


    evtStatelessPing.evtDetach.attach(
        () => evtStatelessPing.getHandlers().length === 0,
        () => ctx.done()
    );

    return evtStatelessPing;

})();

@garronej
Copy link
Owner

The first solution assumes Pothos will only suscribe at most one handler to the Evt.

The second make no assumption.

The code can be simplified a lot if you don't care about posting the initial ping state when the subscription is made by Pothos.

@garronej
Copy link
Owner

This is the code if you don't need to post the initial state:

const evtStatelessPing = (() => {

    const evtStatelessPing = Evt.create<string>();

    const ctx = Evt.newCtx();

    evtStatelessPing.evtAttach.attach(
        () => ctx.getHandlers().length === 0,
        () => evtPing
            .toStateless(ctx)
            .$attach(
                ping => ping === null ? null : [ping],
                ping => evtStatelessPing.post(ping)
            )
    );

    evtStatelessPing.evtDetach.attach(
        () => evtStatelessPing.getHandlers().length === 0,
        () => ctx.done()
    );

    return evtStatelessPing;

})();

@garronej
Copy link
Owner

Let me know if you need further explanation, I'm happy to answer.

@barthuijgen
Copy link
Author

Wow thank you so much for such a thourough answer! I've been a big fan ever since I came across Evt but some of these concepts a still a bit beyond me, slowly learning more about them.

Pothos can indeed call the subscribe multiple times at once, so I've tried implementing your second solution and it works like a charm, exactly as I expected.

It does seem overly complex for what seems like a simple use case, I hope performance is still fine.

I could allow graphql to return null, that might simplify the solution but when filtering I think you're still back to having all this logic. I'll have to think if it's better to apply the filter earlier and split the events over multiple Evt intances, instead of trying to shove different events over one Evt.

Anyway this gives me options to experiment and find the best fit for me, thank you so much!

@garronej
Copy link
Owner

garronej commented Apr 24, 2023

Thanks for the kind words, they mean a lot!
I appreciate your questions, as they help me understand the needs and experiences of users like you. It's always great to see people using EVT correctly.

The implementation does seem overly complex for what appears to be a simple use case

I completely agree. It's unfortunate that EVT isn't more accommodating for this valid use case. I've added two items to my roadmap to address this:

  • Introduce the concept of optionality for StatefulEvt: We'd like to have a stateful EVT that, when attached, calls the handler with its current value (which is already the case) UNLESS its current value is null (or something similar).
  • Make it the default behavior of pipe to ensure that if there isn't anything attached to the downstream EVT, there isn't anything attached to the source EVT either.

I'll takle this alongside the planned integration with ts-pattern

I hope performance is still fine.

I am quite confident that performance will be satisfactory. There will be a small overhead when attaching and detaching, but I believe this won't be an operation performed intensively. Additionally, the processes involved are not resource-intensive.

One final suggestion: consider abstracting this logic into a utility function to streamline the implementation.

import { Evt, type StatefulReadonlyEvt } from "evt";

/** Return a copy of the source Evt. While there isn't anything attached to the returned copy,
  *  the source evt won't be observed.  
  *  When attaching an handler to the copied Evt it will imediately be invoked with the current value
  *  of the source evt unless this value is null.
 */
function weakPipe<T>(evt: StatefulReadonlyEvt<T | null> ): Evt<T>{

    const evtOut = Evt.create<T>();

    const ctx = Evt.newCtx();

    const noSideEffect = () => { };

    evtOut.evtAttach.attach(handler => {

        if (ctx.getHandlers().length === 0) {
            evt.$attach(
                ping => ping === null ? null : [ping],
                ctx,
                ping => evtOut.post(ping)
            );
            return;
        }

        if (evt.state === null) {
            return;
        }

        const op = evtOut.getInvocableOp(handler.op);

        const result = op(evt.state, noSideEffect)

        if (result === null) {
            return;
        }

        handler.callback?.(result[0]);

    });

    evtOut.evtDetach.attach(
        () => evtOut.getHandlers().length === 0,
        () => ctx.done()
    );

    return evtOut;


}

const evtPing = Evt.create<string | null>(null);

const evtPingCopy= weakPipe(evtPing);

@barthuijgen
Copy link
Author

That's great, good luck on working on those improvements, looking forward to any progress. That ts-pattern lib looks pretty neat as well, will give a try sometime.

I think there is a slight issue with the weakPipe unless I'm misunderstanding it's implementation. Currently I need to call weakPipe(evtPing) every time in the graphql subscription for it to work as expected. If I re-use the evtPingCopy then it only works the first time it is used, on a subsequent request it will not pass along any data.

It works fine just re-doing the weakPipe for every sub, which might be what you intended anyway. But from reading the code it felt like after ctx.done() it should be able to attach a new handler.

@garronej
Copy link
Owner

@barthuijgen I'm verry sorry.
I should have tested the code before submiting it to you.
I forget about:
image

Here is the updated code:

import { Evt, type StatefulReadonlyEvt } from "evt";

/** Return a copy of the source Evt. While there isn't anything attached to the returned copy,
  *  the source evt won't be observed.  
  *  When attaching an handler to the copied Evt it will imediately be invoked with the current value
  *  of the source evt unless this value is null.
 */
function weakPipe<T>(evt: StatefulReadonlyEvt<T | null> ): Evt<T>{

    const evtOut = Evt.create<T>();

    let ctx = Evt.newCtx();

    const noSideEffect = () => { };

    evtOut.evtAttach.attach(handler => {

        if (ctx.getHandlers().length === 0) {
            evt.$attach(
                ping => ping === null ? null : [ping],
                ctx,
                ping => evtOut.post(ping)
            );
            return;
        }

        if (evt.state === null) {
            return;
        }

        const op = evtOut.getInvocableOp(handler.op);

        const result = op(evt.state, noSideEffect)

        if (result === null) {
            return;
        }

        handler.callback?.(result[0]);

    });

    evtOut.evtDetach.attach(
        () => evtOut.getHandlers().length === 0,
        () => {
            ctx.done();
            ctx = Evt.newCtx();
        }
    );

    return evtOut;

}

const evtPing = Evt.create<string | null>(null);

const evtPingCopy= weakPipe(evtPing);

@barthuijgen
Copy link
Author

Don't worry about it! I'm testing further and notice that it's not posting the initial state, not on first use or subsequent usage.

I tried to re-create Pothos's use of async iterators in a stackblitz but it's not exactly the same, I was able to reproduce the issue.

https://stackblitz.com/edit/evt-playground-syzowb?file=index.ts

As you can see in the last one I also expected iterator copy:pong 2 to appear in the end. sorry that it's a bit messy.

This problem seems to be unique to using async iterators, because if I swap those out with regular .attach() it works as expected again.

I've updated my project with a different workaround for now, so I dont need this method. just wanted to let you know my findings :)

@garronej
Copy link
Owner

I'm sorry I wasn't able to produce a working solution.

Here is a fix:

-handler.callback?.(result[0])
+Promise.resolve().then(()=>handler.callback?.(result[0]));

https://stackblitz.com/edit/evt-playground-7wpacg?file=index.ts

@barthuijgen
Copy link
Author

Cheers! so for .attach() the handler.callback() does work immediately, but for some reason for async iterators it needs to wait a tick. Is that a bug in Evt, or a drawback of how async iterators work?

I replaced Promise.resolve().then() with process.nextTick(), although that's less portable, it seemed more fitting.

Thanks for all the effort helping me 👍 It's been fun learning more about this.

@barthuijgen
Copy link
Author

The initial attach needed the same treatment

if (ctx.getHandlers().length === 0) {
  evt.$attach(
    (value) => (value === null ? null : [value]),
    ctx,
    (value) => Promise.resolve().then(()=> evtOut.post(value))
  );
  return;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants