Skip to content

Commit

Permalink
feat: updated jobqueue to hand disconnects
Browse files Browse the repository at this point in the history
- the loopback attribute now lives on the Job object
- the add job logic was updted to provide an identifier
- handleDisconenct now removes job corresponding to disconnected client

This commit also includes the following fixes -
fix: exported modules for react and fixed subscribe parameters
  • Loading branch information
Martin-Ting committed Dec 22, 2016
1 parent 8f40cd4 commit 8c667f6
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 10 deletions.
10 changes: 6 additions & 4 deletions src/client-sockets.js
Expand Up @@ -16,10 +16,12 @@ function unsubscribe() {
socket.emit('unsubscribe', { socketid });
}

function graphql(uri, query, variables, callback){
function graphql(query){
socket.emit('mutation', { query });
}

subscribe('http://localhost:4000', '{ getMessage(id: 0) { content, author} }', null, function (data) {
console.log(data);
});
// subscribe('http://localhost:4000', '{ getMessage(id: 0) { content, author} }', null, function (data) {
// console.log(data);
// });

module.exports = {subscribe, unsubscribe, graphql};
17 changes: 14 additions & 3 deletions src/jobqueue.js
Expand Up @@ -15,6 +15,15 @@ class JobQueue {
return this.jobQueue.splice(0,1)[0];
}

removeJob(attribute, value){
for(let i = this.jobQueue.length-1; i >= 0; --i){
if(this.jobQueue[i][attribute] === value){
this.jobQueue.splice(i, 1);
}
}
return;
}

getJobs() {
return this.jobQueue;
}
Expand All @@ -23,13 +32,13 @@ class JobQueue {
return this.jobQueue.length;
}

addObservable(name, callback, errCallback, completeCallback, loopback=true, interval = 100) {
addObservable(name, callback, errCallback, completeCallback, interval = 100) {
if(!this.watchdogs[name]) {
let subscribeCallback = (intervalTime) => {
if(this.jobQueue.length > 0) {
let currJob = this.takeJob();
callback(currJob);
if(loopback) {
if(currJob.loopback) {
this.addJob(currJob);
}
}
Expand All @@ -50,7 +59,7 @@ class JobQueue {
}

class Job {
constructor(pName, pTask, pCallback) {
constructor(pName, pTask, pCallback, pIdentifier, pLoopback) {
this.name = pName;
this.storedTask = pTask;
this.task = (...args) => {
Expand All @@ -64,6 +73,8 @@ class Job {
};
this.callback = pCallback;
this.numPolls = 0;
this.identifier = pIdentifier;
this.loopback = (pLoopback === undefined ? true : false);
this.lastResult;
}

Expand Down
6 changes: 5 additions & 1 deletion src/sockets.js
Expand Up @@ -20,11 +20,15 @@ function setup(server) {
debug(`socket.on(disconnect) :: [${socket.id}] disconnected`);
});
socket.on('mutation', (data) => {
console.log(`socket event[mutation] :: recieved query from ${socket.id}\n${data}`);
graphql.graphql(
graphql.buildSchema(getSchema()),
data.query,
getRoot()
);
).then((result) => {
console.log(`socket event[mutation] :: result for ${socket.id}\n${result}`);
console.log(result);
});
});
});
}
Expand Down
9 changes: 7 additions & 2 deletions src/subql.js
Expand Up @@ -9,6 +9,9 @@ const operations = {};
var storedSchema = '';
var jobQueue = new JobQueue();
jobQueue.addObservable("observable1", (job) => job.runTask(), (err) => console.log(err), () => console.log('complete'));
jobQueue.addObservable("observable2", (job) => job.runTask(), (err) => console.log(err), () => console.log('complete'));
jobQueue.addObservable("observable3", (job) => job.runTask(), (err) => console.log(err), () => console.log('complete'));
jobQueue.addObservable("observable4", (job) => job.runTask(), (err) => console.log(err), () => console.log('complete'));

function parseSchema(schema) {
if (!schema) {
Expand Down Expand Up @@ -110,13 +113,14 @@ function handleSubscribe(query, socketid) {
jobQueue.addJob(new Job(
resolverName + JSON.stringify(inputs),
() => root[resolverName](inputs),
(result) => connected[socketid].socket.emit(socketid, result)
(result) => connected[socketid] !== undefined ? connected[socketid].socket.emit(socketid, result) : console.log(`[Job] :: client has disconnected`),
socketid
));
} else if(operations[resolverName].type === 'Query') {
let oldResolver = root[resolverName];
root[resolverName] = function (...args) {
let ret = oldResolver(...args);
let uniqIdentifier = generateUniqueIdentifier(operations[resolverName].value, ret);
let uniqIdentifier = genetmuxrateUniqueIdentifier(operations[resolverName].value, ret);
db[uniqIdentifier] = !db[uniqIdentifier] ? [socketid] : [...db[uniqIdentifier], socketid];
return ret;
}
Expand Down Expand Up @@ -172,6 +176,7 @@ function findFields(parsedQuery, store) {
}

function handleDisconnect(socketid) {
jobQueue.removeJob('identifier', socketid);
Object.keys(db).forEach((uniqIdentifier) => {
let socketIndex = db[uniqIdentifier].indexOf(socketid);
if(socketIndex >= 0) {
Expand Down

0 comments on commit 8c667f6

Please sign in to comment.