Skip to content

Commit

Permalink
Merge c008678 into b3a6c8d
Browse files Browse the repository at this point in the history
  • Loading branch information
ezsper committed Jan 14, 2017
2 parents b3a6c8d + c008678 commit bb5d041
Showing 1 changed file with 61 additions and 53 deletions.
114 changes: 61 additions & 53 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export interface SubscriptionOptions {
export interface TriggerConfig {
channelOptions?: Object;
filter?: Function;
resolve? :Function;
}

export interface TriggerMap {
Expand Down Expand Up @@ -156,66 +157,73 @@ export class SubscriptionManager {
}
});

let triggerMap: TriggerMap;
let subscribe;

if (this.setupFunctions[subscriptionName]) {
triggerMap = this.setupFunctions[subscriptionName](options, args, subscriptionName);
subscribe = Promise.resolve(this.setupFunctions[subscriptionName](options, args, subscriptionName));
} else {
// if not provided, the triggerName will be the subscriptionName, The trigger will not have any
// options and rely on defaults that are set later.
triggerMap = {[subscriptionName]: {}};
subscribe = Promise.resolve({[subscriptionName]: {}});
}

const externalSubscriptionId = this.maxSubscriptionId++;
this.subscriptions[externalSubscriptionId] = [];
const subscriptionPromises = [];
Object.keys(triggerMap).forEach( triggerName => {
// Deconstruct the trigger options and set any defaults
const {
channelOptions = {},
filter = () => true, // Let all messages through by default.
} = triggerMap[triggerName];

// 2. generate the handler function
//
// rootValue is the payload sent by the event emitter / trigger by
// convention this is the value returned from the mutation
// resolver
const onMessage = (rootValue) => {
let contextPromise;
if (typeof options.context === 'function') {
contextPromise = new Promise((resolve) => {
resolve(options.context());
});
} else {
contextPromise = Promise.resolve(options.context);
}
contextPromise.then((context) => {
if (!filter(rootValue, context)) {
return;
}
execute(
this.schema,
parsedQuery,
rootValue,
context,
options.variables,
options.operationName
).then( data => options.callback(null, data) )
}).catch((error) => {
options.callback(error);
});
}

// 3. subscribe and keep the subscription id
const subsPromise = this.pubsub.subscribe(triggerName, onMessage, channelOptions);
subsPromise.then(id => this.subscriptions[externalSubscriptionId].push(id));

subscriptionPromises.push(subsPromise);
});

// Resolve the promise with external sub id only after all subscriptions completed
return Promise.all(subscriptionPromises).then(() => externalSubscriptionId);
return subscribe.then((triggerMap: TriggerMap) => {
const externalSubscriptionId = this.maxSubscriptionId++;
this.subscriptions[externalSubscriptionId] = [];
const subscriptionPromises = [];
Object.keys(triggerMap).forEach( triggerName => {
// Deconstruct the trigger options and set any defaults
const {
channelOptions = {},
filter = () => true, // Let all messages through by default.
resolve = (result) => result, // Resolve the message before submiting
} = triggerMap[triggerName];

// 2. generate the handler function
//
// rootValue is the payload sent by the event emitter / trigger by
// convention this is the value returned from the mutation
// resolver
const onMessage = (rootValue) => {
let contextPromise;
if (typeof options.context === 'function') {
contextPromise = new Promise((resolve) => {
resolve(options.context());
});
} else {
contextPromise = Promise.resolve(options.context);
}
contextPromise.then((context) => {
if (!filter(rootValue, context)) {
return;
}

return Promise.resolve(
resolve(rootValue, context, triggerName)
).then(rootValue =>
execute(
this.schema,
parsedQuery,
rootValue,
context,
options.variables,
options.operationName
)
).then(data => options.callback(null, data))
})
.catch((error) => options.callback(error));
}

// 3. subscribe and keep the subscription id
const subsPromise = this.pubsub.subscribe(triggerName, onMessage, channelOptions);
subsPromise.then(id => this.subscriptions[externalSubscriptionId].push(id));

subscriptionPromises.push(subsPromise);
});

// Resolve the promise with external sub id only after all subscriptions completed
return Promise.all(subscriptionPromises).then(() => externalSubscriptionId);
}).catch(error => options.callback(error));
}

public unsubscribe(subId){
Expand Down

0 comments on commit bb5d041

Please sign in to comment.