Skip to content

Commit

Permalink
Push-based sync triggering
Browse files Browse the repository at this point in the history
Immediate sync triggering on remote library change using WebSocket API.
Currently kicks off a normal sync process for the modified library --
actual object data isn't pushed.

(This might not stay enabled for 5.0 Final.)
  • Loading branch information
dstillman committed Dec 30, 2016
1 parent 7fd3a8c commit 2beb2c5
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 4 deletions.
2 changes: 1 addition & 1 deletion chrome/content/zotero/xpcom/notifier.js
Expand Up @@ -30,7 +30,7 @@ Zotero.Notifier = new function(){
var _types = [
'collection', 'search', 'share', 'share-items', 'item', 'file',
'collection-item', 'item-tag', 'tag', 'setting', 'group', 'trash', 'publications',
'bucket', 'relation', 'feed', 'feedItem', 'sync'
'bucket', 'relation', 'feed', 'feedItem', 'sync', 'api-key'
];
var _inTransaction;
var _queue = {};
Expand Down
2 changes: 2 additions & 0 deletions chrome/content/zotero/xpcom/sync/syncEventListeners.js
Expand Up @@ -111,6 +111,7 @@ Zotero.Sync.EventListeners.AutoSyncListener = {

register: function () {
this._observerID = Zotero.Notifier.registerObserver(this, false, 'autosync');
Zotero.Sync.Streamer.init();
},

notify: function (event, type, ids, extraData) {
Expand Down Expand Up @@ -163,6 +164,7 @@ Zotero.Sync.EventListeners.AutoSyncListener = {
if (this._observerID) {
Zotero.Notifier.unregisterObserver(this._observerID);
}
Zotero.Sync.Streamer.disconnect();
}
}

Expand Down
2 changes: 2 additions & 0 deletions chrome/content/zotero/xpcom/sync/syncLocal.js
Expand Up @@ -80,6 +80,7 @@ Zotero.Sync.Data.Local = {
Zotero.debug("Clearing old API key");
loginManager.removeLogin(oldLoginInfo);
}
Zotero.Notifier.trigger('delete', 'api-key', []);
return;
}

Expand All @@ -102,6 +103,7 @@ Zotero.Sync.Data.Local = {
Zotero.debug("Replacing API key");
loginManager.modifyLogin(oldLoginInfo, loginInfo);
}
Zotero.Notifier.trigger('modify', 'api-key', []);
},


Expand Down
4 changes: 2 additions & 2 deletions chrome/content/zotero/xpcom/sync/syncRunner.js
Expand Up @@ -97,7 +97,7 @@ Zotero.Sync.Runner_Module = function (options = {}) {
* @param {Function} [options.onError] Function to pass errors to instead of
* handling internally (used for testing)
*/
this.sync = Zotero.Promise.coroutine(function* (options = {}) {
this.sync = Zotero.serial(Zotero.Promise.coroutine(function* (options = {}) {
// Clear message list
_errors = [];

Expand Down Expand Up @@ -240,7 +240,7 @@ Zotero.Sync.Runner_Module = function (options = {}) {
Zotero.debug("Done syncing");
Zotero.Notifier.trigger('finish', 'sync', librariesToSync || []);
}
});
}));


/**
Expand Down
183 changes: 183 additions & 0 deletions chrome/content/zotero/xpcom/sync/syncStreamer.js
@@ -0,0 +1,183 @@
/*
***** BEGIN LICENSE BLOCK *****
Copyright © 2016 Center for History and New Media
George Mason University, Fairfax, Virginia, USA
http://zotero.org
This file is part of Zotero.
Zotero is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Zotero is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with Zotero. If not, see <http://www.gnu.org/licenses/>.
***** END LICENSE BLOCK *****
*/

"use strict";


// Initialized as Zotero.Sync.Streamer in zotero.js
Zotero.Sync.Streamer_Module = function (options = {}) {
this.url = options.url;
this.apiKey = options.apiKey;

let observer = {
notify: function (event, type) {
if (event == 'modify') {
this.init();
}
else if (event == 'delete') {
this.disconnect();
}
}.bind(this)
};
this._observerID = Zotero.Notifier.registerObserver(observer, ['api-key'], 'syncStreamer');
};

Zotero.Sync.Streamer_Module.prototype = {
_observerID: null,
_socket: null,
_socketClosedDeferred: null,
_reconnect: true,
_retry: null,

init: Zotero.Promise.coroutine(function* () {
// Connect to the streaming server
if (!Zotero.Prefs.get('sync.autoSync') || !Zotero.Prefs.get('sync.streaming.enabled')) {
return this.disconnect();
}

// If already connected, disconnect first
if (this._socket && (this._socket.readyState == this._socket.OPEN
|| this._socket.readyState == this._socket.CONNECTING)) {
yield this.disconnect();
}

// Connect to the streaming server
let apiKey = this.apiKey || (yield Zotero.Sync.Data.Local.getAPIKey());
if (apiKey) {
let url = this.url || Zotero.Prefs.get('sync.streaming.url') || ZOTERO_CONFIG.STREAMING_URL;
this._connect(url, apiKey);
}
}),

_connect: function (url, apiKey) {
Zotero.debug(`Connecting to streaming server at ${url}`);

var window = Cc["@mozilla.org/appshell/appShellService;1"]
.getService(Ci.nsIAppShellService)
.hiddenDOMWindow;
this._reconnect = true;

this._socket = new window.WebSocket(url, "zotero-streaming-api-v1");

this._socket.onopen = () => {
Zotero.debug("WebSocket connection opened");
this._reconnectGenerator = null;
};

this._socket.onerror = event => {
Zotero.debug("WebSocket error");
};

this._socket.onmessage = Zotero.Promise.coroutine(function* (event) {
Zotero.debug("WebSocket message: " + this._hideAPIKey(event.data));

let data = JSON.parse(event.data);

if (data.event == "connected") {
// Subscribe with all topics accessible to the API key
let data = JSON.stringify({
action: "createSubscriptions",
subscriptions: [{ apiKey }]
});
Zotero.debug("WebSocket message send: " + this._hideAPIKey(data));
this._socket.send(data);
}
else if (data.event == "subscriptionsCreated") {
for (let error of data.errors) {
Zotero.logError(this._hideAPIKey(JSON.stringify(error)));
}
}
// Library added or removed
else if (data.event == 'topicAdded' || data.event == 'topicRemoved') {
yield Zotero.Sync.Runner.sync({
background: true
});
}
// Library modified
else if (data.event == 'topicUpdated') {
let library = Zotero.URI.getPathLibrary(data.topic);
if (library) {
// Ignore if skipped library
let skipped = Zotero.Sync.Data.Local.getSkippedLibraries();
if (skipped.includes(library.id)) return;

yield Zotero.Sync.Runner.sync({
background: true,
libraries: [library.id]
});
}
}
}.bind(this));

this._socket.onclose = Zotero.Promise.coroutine(function* (event) {
Zotero.debug(`WebSocket connection closed: ${event.code} ${event.reason}`, 2);

if (this._socketClosedDeferred) {
this._socketClosedDeferred.resolve();
}

if (this._reconnect) {
if (event.code >= 4000) {
Zotero.debug("Not reconnecting to WebSocket due to client error");
return;
}

if (!this._reconnectGenerator) {
let intervals = [
2, 5, 10, 15, 30, // first minute
60, 60, 60, 60, // every minute for 4 minutes
120, 120, 120, 120, // every 2 minutes for 8 minutes
300, 300, // every 5 minutes for 10 minutes
600, // 10 minutes
1200, // 20 minutes
1800, 1800, // 30 minutes for 1 hour
3600, 3600, 3600, // every hour for 3 hours
14400, 14400, 14400, // every 4 hours for 12 hours
86400 // 1 day
].map(i => i * 1000);
this._reconnectGenerator = Zotero.Utilities.Internal.delayGenerator(intervals);
}
yield this._reconnectGenerator.next().value;
this._connect(url, apiKey);
}
}.bind(this));
},


_hideAPIKey: function (str) {
return str.replace(/(apiKey":\s*")[^"]+"/, '$1********"');
},


disconnect: Zotero.Promise.coroutine(function* () {
this._reconnect = false;
this._reconnectGenerator = null;
if (this._socket) {
this._socketClosedDeferred = Zotero.Promise.defer();
this._socket.close();
return this._socketClosedDeferred.promise;
}
})
};
28 changes: 28 additions & 0 deletions chrome/content/zotero/xpcom/uri.js
Expand Up @@ -117,6 +117,34 @@ Zotero.URI = new function () {
}


/**
* Get library from path (e.g., users/6 or groups/1)
*
* @return {Zotero.Library|false}
*/
this.getPathLibrary = function (path) {
let matches = path.match(/^\/\/?users\/(\d+)(\/publications)?/);
if (matches) {
let userID = matches[1];
let currentUserID = Zotero.Users.getCurrentUserID();
if (userID != currentUserID) {
Zotero.debug("User ID from streaming server doesn't match current id! "
+ `(${userID} != ${currentUserID})`);
return false;
}
if (matches[2]) {
return Zotero.Libraries.get(Zotero.Libraries.publicationsLibraryID);
}
return Zotero.Libraries.userLibrary;
}
matches = event.data.topic.match(/^\/groups\/(\d+)/);
if (matches) {
let groupID = matches[1];
return Zotero.Groups.get(groupID);
}
}


/**
* Return URI of item, which might be a local URI if user hasn't synced
*/
Expand Down
1 change: 1 addition & 0 deletions chrome/content/zotero/xpcom/utilities_internal.js
Expand Up @@ -619,6 +619,7 @@ Zotero.Utilities.Internal = {
* maxTime isn't specified, the promises will yield true.
*/
"delayGenerator": function* (intervals, maxTime) {
var delay;
var totalTime = 0;
var last = false;
while (true) {
Expand Down
3 changes: 2 additions & 1 deletion chrome/content/zotero/xpcom/zotero.js
Expand Up @@ -707,8 +707,9 @@ Services.scriptloader.loadSubScript("resource://zotero/polyfill.js");

yield Zotero.Sync.Data.Local.init();
yield Zotero.Sync.Data.Utilities.init();
Zotero.Sync.EventListeners.init();
Zotero.Sync.Runner = new Zotero.Sync.Runner_Module;
Zotero.Sync.Streamer = new Zotero.Sync.Streamer_Module;
Zotero.Sync.EventListeners.init();

Zotero.MIMETypeHandler.init();
yield Zotero.Proxies.init();
Expand Down
1 change: 1 addition & 0 deletions components/zotero-service.js
Expand Up @@ -113,6 +113,7 @@ const xpcomFilesLocal = [
'sync/syncFullTextEngine',
'sync/syncLocal',
'sync/syncRunner',
'sync/syncStreamer',
'sync/syncUtilities',
'storage',
'storage/storageEngine',
Expand Down
1 change: 1 addition & 0 deletions defaults/preferences/zotero.js
Expand Up @@ -156,6 +156,7 @@ pref("extensions.zotero.sync.storage.groups.enabled", true);
pref("extensions.zotero.sync.storage.downloadMode.personal", "on-sync");
pref("extensions.zotero.sync.storage.downloadMode.groups", "on-sync");
pref("extensions.zotero.sync.fulltext.enabled", true);
pref("extensions.zotero.sync.streaming.enabled", true);

// Proxy
pref("extensions.zotero.proxies.autoRecognize", true);
Expand Down
1 change: 1 addition & 0 deletions resource/config.js
Expand Up @@ -10,6 +10,7 @@ var ZOTERO_CONFIG = {
WWW_BASE_URL: 'https://www.zotero.org/',
PROXY_AUTH_URL: 'https://s3.amazonaws.com/zotero.org/proxy-auth',
API_URL: 'https://api.zotero.org/',
STREAMING_URL: 'wss://stream.zotero.org/',
API_VERSION: 3,
PREF_BRANCH: 'extensions.zotero.',
BOOKMARKLET_ORIGIN: 'https://www.zotero.org',
Expand Down

0 comments on commit 2beb2c5

Please sign in to comment.