Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does the code-gen support stream service? #31

Closed
boan-anbo opened this issue Mar 28, 2023 · 4 comments
Closed

Does the code-gen support stream service? #31

boan-anbo opened this issue Mar 28, 2023 · 4 comments

Comments

@boan-anbo
Copy link

boan-anbo commented Mar 28, 2023

I have a

service SampleService{
 rpc WatchServerEvents (WatchServerEventsRequest) returns (stream WatchServerEventsResponse) {}
}

But the generated SampleService_connectquery.ts is empty.

The code-gen works for unary calls.

Is streaming supported? Thanks!

@dimitropoulos
Copy link
Contributor

dimitropoulos commented Apr 4, 2023

Streaming is not supported today, but only because we have yet to find a situation where it'd be useful with react-query's API.

I'd be very interested in working with you to better understand your use-case so we can see if there's a react-query API (perhaps, useInfiniteQuery that could be a good fit for your use-case).

If you'd like to do that, could you possible provide an example (similar to https://github.com/bufbuild/connect-query/blob/main/examples/react/basic/src/example.tsx perhaps) of how you might envision something like this working?

@seamusv
Copy link

seamusv commented Apr 17, 2023

I wanted to incorporate connect/disconnect/reconnect into a single React hook so I wrote the following:

import React, {useMemo} from "react";
import {Code} from "@bufbuild/connect";
import {useTransport} from "@bufbuild/connect-query";
import {createCallbackClient} from "@bufbuild/connect-web";
import {MethodInfoServerStreaming, ServiceType} from "@bufbuild/protobuf";

type UseStreamingClientOptions<O, R> = {
    reconnectDelay?: number;
    selectFromResult?: (result: O | undefined) => R
}

type HookResult<T extends ServiceType, M extends keyof T["methods"], O, R> = R extends undefined
    ? { data: O | undefined; error: Error | null; isError: boolean }
    : R & { error: Error | null; isError: boolean };

export const useStreamingClient = <
    T extends ServiceType,
    M extends keyof T["methods"],
    I = T["methods"][M] extends MethodInfoServerStreaming<infer Input, any> ? Input : never,
    O = T["methods"][M] extends MethodInfoServerStreaming<any, infer Output> ? Output : never,
    R = any,
>(
    service: T,
    method: M,
    input: I,
    options?: UseStreamingClientOptions<O, R>,
): HookResult<T, M, O, R> => {
    const cancelFnRef = React.useRef<() => void>();
    const reconnectTimerRef = React.useRef<ReturnType<typeof setTimeout>>();
    const unmountedRef = React.useRef(false);

    const [dataState, setDataState] = React.useState<R extends undefined ? { data: O | undefined } : R>({} as any);
    const [errorState, setErrorState] = React.useState<{ error: Error | null; isError: boolean }>({
        error: null,
        isError: false
    });
    const transport = useTransport();
    const client = useMemo(() => createCallbackClient(service, transport), [service]);

    const reconnectDelay = options?.reconnectDelay ?? 2000;

    const processData = React.useCallback((response: O | Error | undefined) => {
        const isError = response instanceof Error;
        if (isError) {
            setErrorState({error: response, isError});
        } else {
            const newState = options?.selectFromResult ? {...options.selectFromResult(response)} : {data: response};
            setDataState((prevState: any) => {
                if (!shallowEqual(newState, prevState)) {
                    return newState;
                } else {
                    return prevState;
                }
            });
            setErrorState(state => state.isError ? {error: null, isError: false} : state);
        }
    }, [options]);

    const connect = () => {
        console.log("connect");
        cancelFnRef.current?.();

        cancelFnRef.current = client[method](
            input as any,
            processData as any,
            err => {
                if (unmountedRef.current) {
                    return;
                } else if (err?.code == Code.DeadlineExceeded) {
                    connect();
                } else if (err === undefined) {
                    return;
                } else {
                    processData(err);
                    reconnect();
                }
            });
    }

    const disconnect = () => {
        console.log("disconnect");
        clearTimeout(reconnectTimerRef.current);
        cancelFnRef.current?.();
    }

    const reconnect = () => {
        if (reconnectTimerRef.current) {
            clearTimeout(reconnectTimerRef.current);
        }

        reconnectTimerRef.current = setTimeout(() => {
            if (unmountedRef.current) {
                return;
            }
            connect();
        }, reconnectDelay);
    }

    React.useEffect(() => {
        unmountedRef.current = false;
        connect();
        return () => {
            unmountedRef.current = true;
            disconnect();
        }
    }, []);

    return React.useMemo(() => ({...dataState, ...errorState}) as HookResult<T, M, O, R>, [dataState, errorState]);
}

function shallowEqual(obj1: any, obj2: any) {
    if (obj1 === obj2) return true;

    const keys1 = Object.keys(obj1);
    const keys2 = Object.keys(obj2);

    if (keys1.length !== keys2.length) return false;

    for (const key of keys1) {
        if (obj1[key] !== obj2[key]) return false;
    }

    return true;
}

And it would be used as:

export const ServerTime = () => {
    const {time} = useStreamingClient(GreetService, "serverTime", {}, {
        selectFromResult: (data) => ({
            time: formatDate(new Date(Number.parseInt(data?.time.toString() || "0"))),
        }),
    });

    return (
        <div>Server Time: {time}</div>
    );
}

@seamusv
Copy link

seamusv commented Apr 17, 2023

I just noticed that my previous post missed my opening comments to the issue.

You could treat a stream much like how Tk demonstrates wrapping a websocket connection but you still need to handle reconnects.

I am taking a different approach and merging Connect's support for React Query and ES by creating a React hook that takes advantage of the useTransport provider. I have also written a useClient hook that is just the two lines of code in this hook to use the transport and create a client. Everything is typesafe including the string for the method name.

@boan-anbo
Copy link
Author

boan-anbo commented Apr 19, 2023

@seamusv Thanks very much for sharing! I'll try it out next time I need to handle streaming endpoints.

I felt@dimitropoulos 's point is valid and I ended up simply handling the generated connect-es streaming endpoint like this before dispatching the message to the global store.

 const launchWatcher = async () => {
        console.log(`Starting watcher ${watcherId}...`);

        try {
            const stream = myapp.watchServerEvents({});


            let counter = 0;
            for await (const response of stream) {
                counter++;
                console.log(`Message ${counter} from watcher ${watcherId}:`, response.message);
                handleWatchServerEventMessages(response.message);
            }
        } catch (error) {
            console.error(`Error occurred in launch watcher ${watcherId}:`, error);
        }
    };

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants