Skip to content

Commit

Permalink
Fixed tests, one pubsub for whole schema
Browse files Browse the repository at this point in the history
  • Loading branch information
freiksenet committed Jan 3, 2018
1 parent 5e3dd1b commit f320d97
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 44 deletions.
14 changes: 7 additions & 7 deletions src/stitching/makeRemoteExecutableSchema.ts
Expand Up @@ -93,10 +93,11 @@ export default function makeRemoteExecutableSchema({
const subscriptionResolvers: IResolverObject = {};
const subscriptionType = schema.getSubscriptionType();
if (subscriptionType) {
const pubSub = createPubSub ? createPubSub() : new PubSub();
const subscriptions = subscriptionType.getFields();
Object.keys(subscriptions).forEach(key => {
subscriptionResolvers[key] = {
subscribe: createSubscriptionResolver(link, createPubSub),
subscribe: createSubscriptionResolver(key, link, pubSub),
};
});
}
Expand Down Expand Up @@ -177,8 +178,9 @@ function createResolver(fetcher: Fetcher): GraphQLFieldResolver<any, any> {
}

function createSubscriptionResolver(
name: string,
link: ApolloLink,
createPubSub?: () => PubSubEngine,
pubSub: PubSubEngine,
): ResolverFn {
return (root, args, context, info) => {
const fragments = Object.keys(info.fragments).map(
Expand All @@ -196,20 +198,18 @@ function createSubscriptionResolver(
};
const observable = execute(link, operation);

// fallback to in-memory PubSub if no PubSub provided
const pubSub = createPubSub ? createPubSub() : new PubSub();
const observer = {
next(value: any) {
pubSub.publish('static', value.data);
pubSub.publish(`remote-schema-${name}`, value.data);
},
error(err: Error) {
pubSub.publish('static', { errors: [err] });
pubSub.publish(`remote-schema-${name}`, { errors: [err] });
},
};

observable.subscribe(observer);

return pubSub.asyncIterator('static');
return pubSub.asyncIterator(`remote-schema-${name}`);
};
}

Expand Down
24 changes: 11 additions & 13 deletions src/test/testMakeRemoteExecutableSchema.ts
Expand Up @@ -16,7 +16,7 @@ describe('remote subscriptions', () => {
schema = await makeSchemaRemoteFromLink(subscriptionSchema);
});

it('should work', async done => {
it('should work', done => {
const mockNotification = {
notifications: {
text: 'Hello world',
Expand All @@ -32,18 +32,16 @@ describe('remote subscriptions', () => {
`);

let notificationCnt = 0;
subscribe(schema, subscription)
.then(results => {
forAwaitEach(
results as AsyncIterable<ExecutionResult>,
(result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
!notificationCnt++ ? done() : null;
},
).catch(done);
})
.catch(done);
subscribe(schema, subscription).then(results =>
forAwaitEach(
results as AsyncIterable<ExecutionResult>,
(result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
!notificationCnt++ ? done() : null;
},
),
);

subscriptionPubSub.publish(subscriptionPubSubTrigger, mockNotification);
});
Expand Down
73 changes: 49 additions & 24 deletions src/test/testingSchemas.ts
Expand Up @@ -2,6 +2,7 @@ import {
GraphQLSchema,
graphql,
print,
subscribe,
Kind,
GraphQLScalarType,
ValueNode,
Expand Down Expand Up @@ -560,6 +561,18 @@ export const subscriptionSchema: GraphQLSchema = makeExecutableSchema({
resolvers: subscriptionResolvers,
});

const hasSubscriptionOperation = ({ query }: { query: any }): boolean => {
for (let definition of query.definitions) {
if (definition.kind === 'OperationDefinition') {
const operation = definition.operation;
if (operation === 'subscription') {
return true;
}
}
}
return false;
};

// Pretend this schema is remote
export async function makeSchemaRemoteFromLink(schema: GraphQLSchema) {
const link = new ApolloLink(operation => {
Expand All @@ -568,32 +581,44 @@ export async function makeSchemaRemoteFromLink(schema: GraphQLSchema) {
const { query, operationName, variables } = operation;
const { graphqlContext } = operation.getContext();
try {
const result:
| AsyncIterator<ExecutionResult>
| ExecutionResult = await graphql(
schema,
print(query),
null,
graphqlContext,
variables,
operationName,
);
if (
typeof (<AsyncIterator<ExecutionResult>>result).next === 'function'
) {
while (true) {
const next = await (<AsyncIterator<
ExecutionResult
>>result).next();
observer.next(next.value);
if (next.done) {
observer.complete();
break;
}
}
} else {
if (!hasSubscriptionOperation(operation)) {
const result = await graphql(
schema,
print(query),
null,
graphqlContext,
variables,
operationName,
);
observer.next(result);
observer.complete();
} else {
const result = await subscribe(
schema,
query,
null,
graphqlContext,
variables,
operationName,
);
if (
typeof (<AsyncIterator<ExecutionResult>>result).next ===
'function'
) {
while (true) {
const next = await (<AsyncIterator<
ExecutionResult
>>result).next();
observer.next(next.value);
if (next.done) {
observer.complete();
break;
}
}
} else {
observer.next(result as ExecutionResult);
observer.complete();
}
}
} catch (error) {
observer.error.bind(observer);
Expand Down

0 comments on commit f320d97

Please sign in to comment.