Skip to content

Commit

Permalink
Feature: context support (#5)
Browse files Browse the repository at this point in the history
* refactor(test): extract stack creation to external factory, recreate one for each test for better isolation

* refactor(test): wrap tests in `withFactory` wrapper, cleans up after itself in case of exceptions

* feat: add `createContext` support & associated test
  • Loading branch information
edorgeville committed May 24, 2024
1 parent 8c12465 commit a02a567
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 68 deletions.
12 changes: 7 additions & 5 deletions src/adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ export type CreateMQTTHandlerOptions<TRouter extends AnyRouter> = {
router: TRouter;
onError?: OnErrorFunction<TRouter, ConsumeMessage>;
verbose?: boolean;
createContext?: () => Promise<inferRouterContext<TRouter>>;
};

export const createMQTTHandler = <TRouter extends AnyRouter>(
opts: CreateMQTTHandlerOptions<TRouter>
) => {
const { client, requestTopic: requestTopic, router, onError, verbose } = opts;
const { client, requestTopic: requestTopic, router, onError, verbose, createContext } = opts;

const protocolVersion = client.options.protocolVersion ?? 4;
client.subscribe(requestTopic);
Expand All @@ -43,7 +44,7 @@ export const createMQTTHandler = <TRouter extends AnyRouter>(
const correlationId = packet.properties?.correlationData?.toString();
const responseTopic = packet.properties?.responseTopic?.toString();
if (!correlationId || !responseTopic) return;
const res = await handleMessage(router, msg, onError);
const res = await handleMessage(router, msg, onError, createContext);
if (!res) return;
client.publish(responseTopic, Buffer.from(JSON.stringify({ trpc: res })), {
properties: {
Expand All @@ -62,7 +63,7 @@ export const createMQTTHandler = <TRouter extends AnyRouter>(
return;
}
if (!correlationId || !responseTopic) return;
const res = await handleMessage(router, msg, onError);
const res = await handleMessage(router, msg, onError, createContext);
if (!res) return;
client.publish(responseTopic, Buffer.from(JSON.stringify({ trpc: res, correlationId })));
}
Expand All @@ -72,7 +73,8 @@ export const createMQTTHandler = <TRouter extends AnyRouter>(
async function handleMessage<TRouter extends AnyRouter>(
router: TRouter,
msg: ConsumeMessage,
onError?: OnErrorFunction<TRouter, ConsumeMessage>
onError?: OnErrorFunction<TRouter, ConsumeMessage>,
createContext?: () => Promise<inferRouterContext<TRouter>>
) {
const { transformer } = router._def._config;

Expand All @@ -85,7 +87,7 @@ async function handleMessage<TRouter extends AnyRouter>(

const { id, params } = trpc;
const type = MQTT_METHOD_PROCEDURE_TYPE_MAP[trpc.method] ?? ('query' as const);
const ctx: inferRouterContext<TRouter> | undefined = undefined;
const ctx: inferRouterContext<TRouter> | undefined = await createContext?.();

try {
const path = params.path;
Expand Down
11 changes: 10 additions & 1 deletion test/appRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { initTRPC } from '@trpc/server';

export type AppRouter = typeof appRouter;

const t = initTRPC.create();
export async function createContext() {
return { hello: 'world' };
}

export type Context = Awaited<ReturnType<typeof createContext>>;

const t = initTRPC.context<Context>().create();

const publicProcedure = t.procedure;
const router = t.router;
Expand All @@ -28,5 +34,8 @@ export const appRouter = router({
slow: publicProcedure.query(async () => {
await new Promise(resolve => setTimeout(resolve, 10 * 1000));
return 'done';
}),
getContext: publicProcedure.query(({ ctx }) => {
return ctx;
})
});
60 changes: 60 additions & 0 deletions test/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { createTRPCProxyClient } from '@trpc/client';
import Aedes from 'aedes';
import { once } from 'events';
import mqtt from 'mqtt';
import { createServer } from 'net';

import { createMQTTHandler } from '../src/adapter';
import { mqttLink } from '../src/link';
import { type AppRouter, appRouter, createContext } from './appRouter';

export function factory() {
const requestTopic = 'rpc/request';

const aedes = new Aedes();
// aedes.on('publish', (packet, client) => console.log(packet.topic, packet.payload.toString()));
const broker = createServer(aedes.handle);
broker.listen(1883);
const mqttClient = mqtt.connect('mqtt://localhost');

createMQTTHandler({
client: mqttClient,
requestTopic,
router: appRouter,
createContext
});

const client = createTRPCProxyClient<AppRouter>({
links: [
mqttLink({
client: mqttClient,
requestTopic
})
]
});

return {
client,
broker,
mqttClient,
async ready() {
await once(broker, 'listening');
await once(mqttClient, 'connect');
},
close() {
mqttClient.end();
broker.close();
aedes.close();
}
};
}

export async function withFactory(fn: (f: ReturnType<typeof factory>) => Promise<void>) {
const f = factory();
await f.ready();
try {
await fn(f);
} finally {
f.close();
}
}
109 changes: 47 additions & 62 deletions test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,59 @@
import { createTRPCProxyClient } from '@trpc/client';
import Aedes from 'aedes';
import { once } from 'events';
import mqtt from 'mqtt';
import { createServer } from 'net';
import { withFactory } from './factory';

import { createMQTTHandler } from '../src/adapter';
import { mqttLink } from '../src/link';
import { AppRouter, appRouter } from './appRouter';

const requestTopic = 'rpc/request';

const aedes = new Aedes();
// aedes.on('publish', (packet, client) => console.log(packet.topic, packet.payload.toString()));
const broker = createServer(aedes.handle);
broker.listen(1883);
const mqttClient = mqtt.connect('mqtt://localhost');

createMQTTHandler({
client: mqttClient,
requestTopic,
router: appRouter
});

const client = createTRPCProxyClient<AppRouter>({
links: [
mqttLink({
client: mqttClient,
requestTopic
})
]
});

beforeAll(async () => {
await once(broker, 'listening');
await once(mqttClient, 'connect');
});

test('broker is listening', () => {
expect(broker.listening).toBe(true);
});

test('mqtt client is connected', () => {
expect(mqttClient.connected).toBe(true);
describe('broker', () => {
test('is listening', async () => {
await withFactory(async ({ broker }) => {
expect(broker.listening).toBe(true);
});
});
});

test('greet query', async () => {
const greeting = await client.greet.query('world');
expect(greeting).toEqual({ greeting: 'hello, world!' });
describe('mqtt client', () => {
test('is connected', async () => {
await withFactory(async ({ mqttClient }) => {
expect(mqttClient.connected).toBe(true);
});
});
});

test('countUp mutation', async () => {
const addOne = await client.countUp.mutate(1);
expect(addOne).toBe(1);
describe('procedures', () => {
test('greet query', async () => {
await withFactory(async ({ client }) => {
const greeting = await client.greet.query('world');
expect(greeting).toEqual({ greeting: 'hello, world!' });
});
});

const addTwo = await client.countUp.mutate(2);
expect(addTwo).toBe(3);
});
test('countUp mutation', async () => {
await withFactory(async ({ client }) => {
const addOne = await client.countUp.mutate(1);
expect(addOne).toBe(1);

test('abortSignal is handled', async () => {
const controller = new AbortController();
const promise = client.slow.query(undefined, {
signal: controller.signal
const addTwo = await client.countUp.mutate(2);
expect(addTwo).toBe(3);
});
});

controller.abort();
await expect(promise).rejects.toThrow('aborted');
describe('abort signal', () => {
test('is handled', async () => {
await withFactory(async ({ client }) => {
const controller = new AbortController();
const promise = client.slow.query(undefined, {
signal: controller.signal
});

controller.abort();
await expect(promise).rejects.toThrow('aborted');
});
});
});
});

afterAll(async () => {
mqttClient.end();
broker.close();
aedes.close();
describe('context', () => {
test('getContext query', async () => {
await withFactory(async ({ client }) => {
const ctx = await client.getContext.query();
expect(ctx).toEqual({ hello: 'world' });
});
});
});

0 comments on commit a02a567

Please sign in to comment.