Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Support for 3rd party clients

- Connection keys are now generated via uuid
- Methods for creating new Wavelets via client protocol added
- Some client model methods backported from desktop client
- Implemented error messages (removed TODOs)
- Implemented manager messages: allows monitoring of Wavelets which are
  not opened
- Implemented login messages: authentication via STOMP protocol now
  possible
- Moved WAVELET_ADD_PARTICIPANT and WAVELET_REMOVE_SELF into
  OPERATION_MESSAGE_BUNDLE, modified model and controller for this
  • Loading branch information...
commit fe14d49f2477b677ccade8ac629ddbb13462db4c 1 parent 266e145
@p2k p2k authored
View
262 pygowave_client/cache/model/model.js
@@ -95,6 +95,18 @@ pygowave.model = (function() {
},
/**
+ * Sets the participant's display name.
+ * @function {public} setDisplayName
+ * @param {String} value The new name
+ */
+ setDisplayName: function (value) {
+ if (this.options.displayName != value) {
+ this.options.displayName = value;
+ this.fireEvent("dataChanged");
+ }
+ },
+
+ /**
* Returns the URL of the participant's avatar.
* @function {public String} thumbnailUrl
*/
@@ -103,6 +115,18 @@ pygowave.model = (function() {
},
/**
+ * Sets the URL of the participant's avatar.
+ * @function {public} setThumbnailUrl
+ * @param {String} value The new URL
+ */
+ setThumbnailUrl: function (value) {
+ if (this.options.thumbnailUrl != value) {
+ this.options.thumbnailUrl = value;
+ this.fireEvent("dataChanged");
+ }
+ },
+
+ /**
* Returns the URL to the participant's profile.
* @function {public String} profileUrl
*/
@@ -111,6 +135,18 @@ pygowave.model = (function() {
},
/**
+ * Sets the URL to the participant's profile.
+ * @function {public} setProfileUrl
+ * @param {String} value The new URL
+ */
+ setProfileUrl: function (value) {
+ if (this.options.profileUrl != value) {
+ this.options.profileUrl = value;
+ this.fireEvent("dataChanged");
+ }
+ },
+
+ /**
* Returns weather the participant is a bot.
* @function {public Boolean} isBot
*/
@@ -119,6 +155,18 @@ pygowave.model = (function() {
},
/**
+ * Sets weather the participant is a bot.
+ * @function {public} setBot
+ * @param {String} value True, if this participant is a bot
+ */
+ setBot: function (value) {
+ if (this.options.isBot != value) {
+ this.options.isBot = value;
+ this.fireEvent("dataChanged");
+ }
+ },
+
+ /**
* Returns weather the participant is a online.
* @function {public Boolean} isOnline
*/
@@ -597,6 +645,22 @@ pygowave.model = (function() {
},
/**
+ * Returns an Element by its id.
+ * @function {public Element} elementById
+ * @param {int} id ID of the element
+ */
+ elementById: function (id) {
+ for (var __iter0_ = new XRange(len(this._elements)); __iter0_.hasNext();) {
+ var i = __iter0_.next();
+ var elt = this._elements[i];
+ if (elt.id() == id)
+ return elt;
+ }
+ delete __iter0_;
+ return null;
+ },
+
+ /**
* Returns the Element object at the given position or null.
*
* @function {public Element} elementAt
@@ -864,6 +928,18 @@ pygowave.model = (function() {
* @event onStatusChange
* @param {String} status The new status; can be 'clean', 'dirty' or 'invalid'
*/
+
+ /**
+ * Fired when the Wavelet's title changed.
+ * @event onTitleChanged
+ * @param {String} title The Wavelet's new title
+ */
+
+ /**
+ * Fired when the Wavelet's last modification date/time changed.
+ * @event onLastModifiedChanged
+ * @param {Date} datetime New date of last modification
+ */
,
/**
@@ -898,6 +974,26 @@ pygowave.model = (function() {
},
/**
+ * Returns the title of this Wavelet
+ * @function {public String} title
+ */
+ title: function () {
+ return this.options.title;
+ },
+
+ /**
+ * Sets the title of this Wavelet
+ * @function {public} setTitle
+ * @param {String} title The new title
+ */
+ setTitle: function (title) {
+ if (this.options.title != title) {
+ this.options.title = title;
+ this.fireEvent("titleChanged", title);
+ }
+ },
+
+ /**
* Returns true, if this Wavelet is the Wave's root Wavelet.
* @function {public Boolean} isRoot
*/
@@ -930,6 +1026,14 @@ pygowave.model = (function() {
},
/**
+ * Returns the number of participants on this Wavelet.
+ * @function {public int} participantCount
+ */
+ participantCount: function () {
+ return this._participants.getLength();
+ },
+
+ /**
* Add a participant to this Wavelet.<br/>
* Note: Fires {@link pygowave.model.Wavelet.onParticipantsChanged onParticipantsChanged}
*
@@ -964,7 +1068,10 @@ pygowave.model = (function() {
* @param {String} id ID of the Participant
*/
participant: function (id) {
- return this._participants.get(id);
+ if (this._participants.has(id))
+ return this._participants.get(id);
+ else
+ return null;
},
/**
@@ -1067,6 +1174,31 @@ pygowave.model = (function() {
},
/**
+ * Returns a list of all Blips on this Wavelet, starting with the root Blip.
+ *
+ * @function {Blip[]} allBlips
+ */
+ allBlips: function () {
+ return this._blips;
+ },
+
+ /**
+ * Returns a list of all IDs of the Blips on this Wavelet, starting with
+ * the root Blip.
+ *
+ * @function {public String[]} allBlipIDs
+ */
+ allBlipIDs: function () {
+ var ids = [];
+ for (var __iter0_ = new _Iterator(this._blips); __iter0_.hasNext();) {
+ var blip = __iter0_.next();
+ ids.append(blip.id());
+ }
+ delete __iter0_;
+ return ids;
+ },
+
+ /**
* Internal method to set the root Blip. Not intended to be called
* outside of this implementation.
*
@@ -1093,6 +1225,42 @@ pygowave.model = (function() {
},
/**
+ * Returns the Wavelet's current status. Can be "clean", "dirty" or "invalid".
+ * @function {public String} status
+ */
+ status: function () {
+ return this.options.status;
+ },
+
+ /**
+ * Returns the creation date/time of this Wavelet.
+ * @function {public Date} created
+ */
+ created: function () {
+ return this.options.created;
+ },
+
+ /**
+ * Returns the date/time of the last modification of this Wavelet.
+ * @function {public Date} lastModified
+ */
+ lastModified: function () {
+ return this.options.last_modified;
+ },
+
+ /**
+ * Sets the date/time of the last modification of this Wavelet.
+ * @function {public} setLastModified
+ * @param {Date} value The new date/time of the last modification
+ */
+ setLastModified: function (value) {
+ if (value != this.options.last_modified) {
+ this.options.last_modified = value;
+ this.fireEvent("lastModifiedChanged", value);
+ }
+ },
+
+ /**
* Calculate and compare checksums of all Blips to the given map.
* Fires {@link pygowave.model.Wavelet.onStatusChange onStatusChange} if
* the status changes.
@@ -1124,7 +1292,7 @@ pygowave.model = (function() {
* @function {public} applyOperations
* @param {pygowave.operations.Operation[]} ops List of operations to apply
*/
- applyOperations: function (ops) {
+ applyOperations: function (ops, participants) {
for (var __iter0_ = new _Iterator(ops); __iter0_.hasNext();) {
var op = __iter0_.next();
if (op.blipId != "") {
@@ -1142,6 +1310,42 @@ pygowave.model = (function() {
else if (op.type == pygowave.operations.DOCUMENT_ELEMENT_SETPREF)
blip.setElementUserpref(op.index, op.property.key, op.property.value);
}
+ else if (op.type == pygowave.operations.WAVELET_ADD_PARTICIPANT)
+ this.addParticipant(participants[op.property]);
+ else if (op.type == pygowave.operations.WAVELET_REMOVE_PARTICIPANT)
+ this.removeParticipant(op.property);
+ }
+ delete __iter0_;
+ },
+
+ /**
+ * Load the blips from a snapshot.
+ * @function {public} loadBlipsFromSnapshot
+ * @param {Object} blips The JSON-serialized snapshot to load
+ * @param {String} rootBlipId ID to identify the root Blip
+ * @param {Hash} participants A map of participant objects
+ */
+ loadBlipsFromSnapshot: function (blips, rootBlipId, participants) {
+ for (var __iter0_ = new _Iterator(blips); __iter0_.hasNext();) {
+ var blip = __iter0_.next();
+ var blip_id = __iter0_.key();
+ var blip_options = {
+ creator: participants[blip.creator],
+ is_root: blip_id == rootBlipId,
+ last_modified: blip.lastModifiedTime,
+ version: blip.version,
+ submitted: blip.submitted
+ };
+ var blip_elements = [];
+ for (var __iter1_ = new _Iterator(blip.elements); __iter1_.hasNext();) {
+ var serialelement = __iter1_.next();
+ if (serialelement.type == ELEMENT_TYPE.GADGET)
+ blip_elements.append(new GadgetElement(null, serialelement.id, serialelement.index, serialelement.properties));
+ else
+ blip_elements.append(new Element(null, serialelement.id, serialelement.index, serialelement.type, serialelement.properties));
+ }
+ delete __iter1_;
+ var blipObj = this.appendBlip(blip_id, blip_options, blip.content, blip_elements);
}
delete __iter0_;
}
@@ -1160,6 +1364,11 @@ pygowave.model = (function() {
* @param {String} waveletId ID of the Wavelet that has been added
* @param {Boolean} isRoot True if this is the (new) root Wavelet
*/
+ /**
+ * Fired before a wavelet is removed.
+ * @event onWaveletAboutToBeRemoved
+ * @param {String} waveletId ID of the Wavelet that will be removed
+ */
/**
* Called on instantiation.
@@ -1216,28 +1425,7 @@ pygowave.model = (function() {
rootWaveletObj.addParticipant(participants[part_id]);
}
delete __iter0_;
- for (var __iter0_ = new _Iterator(obj.blips); __iter0_.hasNext();) {
- var blip = __iter0_.next();
- var blip_id = __iter0_.key();
- var blip_options = {
- creator: participants[blip.creator],
- is_root: blip.blipId == rootWavelet.rootBlipId,
- last_modified: blip.lastModifiedTime,
- version: blip.version,
- submitted: blip.submitted
- };
- var blip_elements = [];
- for (var __iter1_ = new _Iterator(blip.elements); __iter1_.hasNext();) {
- var serialelement = __iter1_.next();
- if (serialelement.type == ELEMENT_TYPE.GADGET)
- blip_elements.append(new GadgetElement(null, serialelement.id, serialelement.index, serialelement.properties));
- else
- blip_elements.append(new Element(null, serialelement.id, serialelement.index, serialelement.type, serialelement.properties));
- }
- delete __iter1_;
- var blipObj = rootWaveletObj.appendBlip(blip_id, blip_options, blip.content, blip_elements);
- }
- delete __iter0_;
+ rootWaveletObj.loadBlipsFromSnapshot(obj.blips, rootWavelet.rootBlipId, participants);
},
/**
@@ -1267,6 +1455,15 @@ pygowave.model = (function() {
},
/**
+ * Return a list of all Wavelets on this Wave.
+ *
+ * @function {public Wavelet[]} allWavelets
+ */
+ allWavelets: function () {
+ return this._wavelets.getValues();
+ },
+
+ /**
* Returns the root Wavelet of this Wave.
*
* @function {public Wavelet} rootWavelet
@@ -1284,6 +1481,23 @@ pygowave.model = (function() {
*/
_setRootWavelet: function (wavelet) {
this._rootWavelet = wavelet;
+ },
+
+ /**
+ * Removes and deletes a wavelet by its id. Fires
+ * {@link pygowave.model.WaveModel.onWaveletAboutToBeRemoved} beforehand.
+ *
+ * @function {public} removeWavelet
+ * @param {String} waveletId ID of the Wavelet to remove
+ */
+ removeWavelet: function (waveletId) {
+ if (!this._wavelets.has(waveletId))
+ return;
+ this.fireEvent("waveletAboutToBeRemoved", waveletId);
+ var wavelet = this._wavelets.get(waveletId);
+ this._wavelets.erase(waveletId);
+ if (wavelet == this._rootWavelet)
+ this._rootWavelet = null;
}
});
View
96 pygowave_client/cache/operations/operations.js
@@ -37,6 +37,10 @@ pygowave.operations = (function() {
var DOCUMENT_ELEMENT_DELETE = "DOCUMENT_ELEMENT_DELETE";
+ var WAVELET_ADD_PARTICIPANT = "WAVELET_ADD_PARTICIPANT";
+
+ var WAVELET_REMOVE_PARTICIPANT = "WAVELET_REMOVE_PARTICIPANT";
+
var DOCUMENT_ELEMENT_DELTA = "DOCUMENT_ELEMENT_DELTA";
var DOCUMENT_ELEMENT_SETPREF = "DOCUMENT_ELEMENT_SETPREF";
@@ -342,9 +346,7 @@ pygowave.operations = (function() {
}
else {
op.resize(op.length() - myop.length());
- this.fireEvent("beforeOperationsRemoved", [i, i]);
- this.operations.pop(i);
- this.fireEvent("afterOperationsRemoved", [i, i]);
+ this.removeOperation(i);
i--;
break;
}
@@ -354,13 +356,11 @@ pygowave.operations = (function() {
if (op.index >= end)
op.index -= myop.length();
else if (op.index + op.length() <= end) {
+ myop.resize(myop.length() - op.length());
op_lst.pop(j);
j--;
- myop.resize(myop.length() - op.length());
if (myop.isNull()) {
- this.fireEvent("beforeOperationsRemoved", [i, i]);
- this.operations.pop(i);
- this.fireEvent("afterOperationsRemoved", [i, i]);
+ this.removeOperation(i);
i--;
break;
}
@@ -405,9 +405,7 @@ pygowave.operations = (function() {
myop.resize(op.index - myop.index);
this.fireEvent("operationChanged", i);
new_op.resize(new_op.length() - myop.length());
- this.fireEvent("beforeOperationsInserted", [i + 1, i + 1]);
- this.operations.insert(i + 1, new_op);
- this.fireEvent("afterOperationsInserted", [i + 1, i + 1]);
+ this.insertOperation(i + 1, new_op);
op.index = myop.index;
}
}
@@ -449,6 +447,13 @@ pygowave.operations = (function() {
this.fireEvent("operationChanged", i);
}
}
+ else if (op.type == WAVELET_ADD_PARTICIPANT && myop.type == WAVELET_ADD_PARTICIPANT || op.type == WAVELET_REMOVE_PARTICIPANT && myop.type == WAVELET_REMOVE_PARTICIPANT) {
+ if (op.property == myop.property) {
+ this.removeOperation(i);
+ i--;
+ break;
+ }
+ }
j++;
}
i++;
@@ -568,9 +573,7 @@ pygowave.operations = (function() {
newop.resize(newop.length() - remain);
}
if (op.isNull()) {
- this.fireEvent("beforeOperationsRemoved", [i, i]);
- this.operations.pop(i);
- this.fireEvent("afterOperationsRemoved", [i, i]);
+ this.removeOperation(i);
i--;
}
else
@@ -581,9 +584,7 @@ pygowave.operations = (function() {
else if (newop.index < op.index && newop.index + newop.length() > op.index) {
if (newop.index + newop.length() >= (op.index + op.length())) {
newop.resize(newop.length() - op.length());
- this.fireEvent("beforeOperationsRemoved", [i, i]);
- this.operations.pop(i);
- this.fireEvent("afterOperationsRemoved", [i, i]);
+ this.removeOperation(i);
i--;
}
else {
@@ -607,10 +608,43 @@ pygowave.operations = (function() {
return;
}
}
+ else if (newop.type == WAVELET_ADD_PARTICIPANT && op.type == WAVELET_ADD_PARTICIPANT || newop.type == WAVELET_REMOVE_PARTICIPANT && op.type == WAVELET_REMOVE_PARTICIPANT) {
+ if (newop.property == op.property)
+ return;
+ }
}
- this.fireEvent("beforeOperationsInserted", [i + 1, i + 1]);
- this.operations.append(newop);
- this.fireEvent("afterOperationsInserted", [i + 1, i + 1]);
+ this.insertOperation(i + 1, newop);
+ },
+
+ /**
+ * Inserts an operation at the specified index.
+ * Fires signals appropriately.
+ *
+ * @function {public} insertOperation
+ * @param {int} index Position in operations list
+ * @param {Operation} op Operation object to insert
+ */
+ insertOperation: function (index, op) {
+ if (index > len(this.operations) || index < 0)
+ return;
+ this.fireEvent("beforeOperationsInserted", [index, index]);
+ this.operations.insert(index, op);
+ this.fireEvent("afterOperationsInserted", [index, index]);
+ },
+
+ /**
+ * Removes an operation at the specified index.
+ * Fires signals appropriately.
+ *
+ * @function {public} removeOperation
+ * @param {int} index Position in operations list
+ */
+ removeOperation: function (index) {
+ if (index < 0 || index >= len(this.operations))
+ return;
+ this.fireEvent("beforeOperationsRemoved", [index, index]);
+ this.operations.pop(index);
+ this.fireEvent("afterOperationsRemoved", [index, index]);
},
/**
@@ -690,6 +724,26 @@ pygowave.operations = (function() {
key: key,
value: value
}));
+ },
+
+ /**
+ * Requests to add a Participant to the wavelet.
+ *
+ * @function {public} waveletAddParticipant
+ * @param {String} id ID of the Participant to add
+ */
+ waveletAddParticipant: function (id) {
+ this.__insert(new Operation(WAVELET_ADD_PARTICIPANT, this.waveId, this.waveletId, "", -1, id));
+ },
+
+ /**
+ * Requests to remove a Participant to the wavelet.
+ *
+ * @function {public} waveletRemoveParticipant
+ * @param {String} id ID of the Participant to remove
+ */
+ waveletRemoveParticipant: function (id) {
+ this.__insert(new Operation(WAVELET_REMOVE_PARTICIPANT, this.waveId, this.waveletId, "", -1, id));
}
});
@@ -700,6 +754,8 @@ pygowave.operations = (function() {
DOCUMENT_ELEMENT_INSERT: DOCUMENT_ELEMENT_INSERT,
DOCUMENT_ELEMENT_DELETE: DOCUMENT_ELEMENT_DELETE,
DOCUMENT_ELEMENT_DELTA: DOCUMENT_ELEMENT_DELTA,
- DOCUMENT_ELEMENT_SETPREF: DOCUMENT_ELEMENT_SETPREF
+ DOCUMENT_ELEMENT_SETPREF: DOCUMENT_ELEMENT_SETPREF,
+ WAVELET_ADD_PARTICIPANT: WAVELET_ADD_PARTICIPANT,
+ WAVELET_REMOVE_PARTICIPANT: WAVELET_REMOVE_PARTICIPANT
};
})();
View
169 pygowave_client/src/controller/controller.js
@@ -114,7 +114,6 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
this.waves.set(model.id(), model);
this.wavelets = new Hash();
- this.new_participants = new Array();
this.participants = new Hash();
this._cachedGadgetList = null;
this._deferredMessageBundles = new Array();
@@ -129,6 +128,15 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
onclose: function(c) {self.onConnClose(c);},
onerror: function(e) {self.onConnError(e);},
onconnectedframe: function() {
+ // Subscribe to manager
+ this.subscribe(
+ self.options.waveAccessKeyRx + ".manager.waveop",
+ {
+ routing_key: self.options.waveAccessKeyRx + ".manager.waveop",
+ exchange: "wavelet.direct",
+ exclusive: true
+ }
+ );
self.openWavelet(self.options.initialWave, self.options.initialWavelet);
},
onmessageframe: function(frame) {
@@ -194,6 +202,36 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
if (this.wavelets.has(wavelet_id))
wavelet_model = this.wavelets[wavelet_id].model;
+ if (wavelet_id == "manager") {
+ for (var it = new _Iterator(msg);it.hasNext();) {
+ msg = it.next();
+ switch (msg.type) {
+ case "PARTICIPANT_INFO":
+ this._processParticipantsInfo(msg.property);
+ break;
+ case "PARTICIPANT_SEARCH":
+ if (msg.property.result == "OK") {
+ this._collateParticipants(msg.property.data);
+ this._iview.updateSearchResults(this._getParticipants(msg.property.data));
+ }
+ else if (msg.property.result == "TOO_SHORT")
+ this._iview.invalidSearch(msg.property.data);
+ break;
+ case "WAVELET_REMOVE_PARTICIPANT":
+ if (this.wavelets.has(msg.property.waveletId))
+ this.wavelets[msg.property.waveletId].model.removeParticipant(msg.property.id);
+ break;
+ case "PONG":
+ this.fireEvent("ping", $time() - msg.property);
+ break;
+ case "ERROR":
+ this._iview.showControllerError(gettext("The server reports the following error:<br/><br/>%s<br/><br/>Error Tag: %s").sprintf(msg.property.desc, msg.property.tag));
+ break;
+ }
+ }
+ return;
+ }
+
for (var it = new _Iterator(msg);it.hasNext();) {
msg = it.next();
switch (msg.type) {
@@ -204,15 +242,14 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
var wave_id = msg.property.wavelet.waveId;
var wave_model = this.waves[wave_id];
wave_model.loadFromSnapshot(msg.property, this.participants);
+ wavelet_model = wave_model.wavelet(wavelet_id);
this.wavelets[wavelet_id] = {
- model: wave_model.wavelet(wavelet_id),
+ model: wavelet_model,
pending: false,
blocked: false
};
this._setupOpManagers(wave_id, wavelet_id);
this.fireEvent("waveletOpened", [wave_id, wavelet_id]);
-
- this._requestParticipantInfo(wavelet_id);
break;
case "OPERATION_MESSAGE_BUNDLE_ACK":
this._queueMessageBundle(wavelet_model, "ACK", msg.property.version, msg.property.blipsums);
@@ -220,39 +257,12 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
case "OPERATION_MESSAGE_BUNDLE":
this._queueMessageBundle(wavelet_model, msg.property.operations, msg.property.version, msg.property.blipsums);
break;
- case "PARTICIPANT_INFO":
- this._processParticipantsInfo(msg.property);
- break;
- case "PARTICIPANT_SEARCH":
- if (msg.property.result == "OK") {
- this._collateParticipants(msg.property.data);
- this._iview.updateSearchResults(this._getParticipants(msg.property.data));
- this._requestParticipantInfo(wavelet_id);
- }
- else if (msg.property.result == "TOO_SHORT")
- this._iview.invalidSearch(msg.property.data);
- break;
- case "WAVELET_ADD_PARTICIPANT":
- this._collateParticipants([msg.property]);
- wavelet_model.addParticipant(this.participants[msg.property]);
- this._requestParticipantInfo(wavelet_id);
- break;
- case "WAVELET_REMOVE_PARTICIPANT":
- if (msg.property == this.options.viewerId) {
- // Bye bye
- this.conn.disconnect();
- window.location.href = this.options.waveOverviewUrl;
- return;
- }
- else
- wavelet_model.removeParticipant(msg.property);
- break;
case "GADGET_LIST":
this._cachedGadgetList = msg.property;
this._iview.updateGadgetList(msg.property);
break;
- case "PONG":
- this.fireEvent("ping", $time() - msg.property);
+ case "ERROR":
+ this._iview.showControllerError(gettext("The server reports the following error:<br/><br/>%s<br/><br/>Wavelet ID: %s<br/>Error Tag: %s").sprintf(msg.property.desc, wavelet_id, msg.property.tag));
break;
}
}
@@ -345,30 +355,33 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
* @function {private} _sendPing
*/
_sendPing: function () {
- // Pick first wavelet
- wavelet_id = this.wavelets.getKeys()[0];
- if (!$defined(wavelet_id))
- return;
- this.conn.sendJson(wavelet_id, {
+ this.conn.sendJson("manager", {
type: "PING",
property: $time()
});
},
/**
* Collate the internal participant "database" with the given ID list.
- * New participants will be added to the new_participants array, so
- * they can be retrieved later.
+ * New participants will be retrieved and updated later.
* @function {private} _collateParticipants
* @param {String[]} id_list List of participant IDs
*/
_collateParticipants: function (id_list) {
+ var todo = new Array();
for (var it = new _Iterator(id_list);it.hasNext();) {
var id = it.next();
if (!this.participants.has(id)) {
this.participants.set(id, new pygowave.model.Participant(id));
- this.new_participants.append(id);
+ todo.append(id);
}
}
+
+ if (todo.length > 0) {
+ this.conn.sendJson("manager", {
+ type: "PARTICIPANT_INFO",
+ property: todo
+ });
+ }
},
/**
@@ -387,24 +400,6 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
},
/**
- * Requests information on participants from the server, which have not
- * been retrieved yet. Reads from the new_participants array.
- * @function {private} _requestParticipantInfo
- * @param {String} wavelet_id ID of the Wavelet
- */
- _requestParticipantInfo: function (wavelet_id) {
- if (this.new_participants.length == 0)
- return;
-
- this.conn.sendJson(wavelet_id, {
- type: "PARTICIPANT_INFO",
- property: this.new_participants
- });
-
- this.new_participants = new Array();
- },
-
- /**
* Callback from server after participant info request.
* @function {private} _processParticipantsInfo
* @param {Object} pmap Participants map
@@ -412,9 +407,6 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
_processParticipantsInfo: function (pmap) {
for (var it = new _Iterator(pmap); it.hasNext(); ) {
var pdata = it.next(), id = it.key();
- var i = this.new_participants.indexOf(id);
- if (i != -1)
- this.new_participants.pop(i);
var obj;
if (this.participants.has(id))
obj = this.participants[id];
@@ -443,6 +435,25 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
self._transferOperations(wavelet_id);
};
this.wavelets[wavelet_id].mcached.addEvent('afterOperationsInserted', this.wavelets[wavelet_id]._mcached_wrapper);
+ this.wavelets[wavelet_id]._participants_watcher = function () {
+ self._checkParticipants(wavelet_id);
+ }
+ this.wavelets[wavelet_id].model.addEvent('participantsChanged', this.wavelets[wavelet_id]._participants_watcher)
+ },
+ /**
+ * Check if we are on the wavelet or if we have been removed
+ */
+ _checkParticipants: function (wavelet_id) {
+ if (this.wavelets[wavelet_id].model.participant(this.options.viewerId) == null) {
+ // Bye bye
+ this.conn.sendJson("manager", {type: "DISCONNECT"});
+ this.conn.disconnect();
+ var self = this;
+ (function () {
+ window.location.href = self.options.waveOverviewUrl;
+ }).delay(25);
+ return;
+ }
},
/**
* Send ready made operations to the server.
@@ -528,8 +539,18 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
}
}
+ // Check for new participants
+ var newParticipants = new Array();
+ for (var op_it = new _Iterator(ops); op_it.hasNext(); ) {
+ var op = op_it.next();
+ if (op.type == pygowave.operations.WAVELET_ADD_PARTICIPANT)
+ newParticipants.push(op.property);
+ }
+ if (newParticipants.length > 0)
+ this._collateParticipants(newParticipants);
+
// Apply operations
- wavelet.applyOperations(ops);
+ wavelet.applyOperations(ops, this.participants);
// Set version and checkup
wavelet.options.version = version;
@@ -656,7 +677,7 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
* @param {String} text Entered search query
*/
_onSearchForParticipant: function (waveletId, text) {
- this.conn.sendJson(waveletId, {
+ this.conn.sendJson("manager", {
type: "PARTICIPANT_SEARCH",
property: text
});
@@ -668,10 +689,11 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
* @param {String} participantId ID of the Participant to add
*/
_onAddParticipant: function (waveletId, participantId) {
- this.conn.sendJson(waveletId, {
- type: "WAVELET_ADD_PARTICIPANT",
- property: participantId
- });
+ if (this.wavelets[waveletId].model.participant(participantId))
+ return; // Do nothing
+ this.wavelets[waveletId].mcached.waveletAddParticipant(participantId);
+ this._collateParticipants([participantId]);
+ this.wavelets[waveletId].model.addParticipant(this.participants[participantId]);
},
/**
* Callback from view to leave the (root) wavelet.
@@ -679,9 +701,14 @@ pygowave.controller = $defined(pygowave.controller) ? pygowave.controller : new
* @param {String} waveletId ID of the Wavelet
*/
_onLeaveWavelet: function (waveletId) {
- this.conn.sendJson(waveletId, {
- type: "WAVELET_REMOVE_SELF"
- });
+ this.wavelets[waveletId].mcached.waveletRemoveParticipant(this.options.viewerId);
+ // Bye bye
+ this.conn.sendJson("manager", {type: "DISCONNECT"});
+ this.conn.disconnect();
+ var self = this;
+ (function () {
+ window.location.href = self.options.waveOverviewUrl;
+ }).delay(25);
},
/**
* Callback from view on gadget adding.
View
226 pygowave_client/src/model/model.py
@@ -90,6 +90,16 @@ def displayName(self):
"""
return self.options["displayName"]
+ def setDisplayName(self, value):
+ """
+ Sets the participant's display name.
+ @function {public} setDisplayName
+ @param {String} value The new name
+ """
+ if self.options["displayName"] != value:
+ self.options["displayName"] = value
+ self.fireEvent('dataChanged')
+
def thumbnailUrl(self):
"""
Returns the URL of the participant's avatar.
@@ -97,6 +107,16 @@ def thumbnailUrl(self):
"""
return self.options["thumbnailUrl"]
+ def setThumbnailUrl(self, value):
+ """
+ Sets the URL of the participant's avatar.
+ @function {public} setThumbnailUrl
+ @param {String} value The new URL
+ """
+ if self.options["thumbnailUrl"] != value:
+ self.options["thumbnailUrl"] = value
+ self.fireEvent('dataChanged')
+
def profileUrl(self):
"""
Returns the URL to the participant's profile.
@@ -104,6 +124,16 @@ def profileUrl(self):
"""
return self.options["profileUrl"]
+ def setProfileUrl(self, value):
+ """
+ Sets the URL to the participant's profile.
+ @function {public} setProfileUrl
+ @param {String} value The new URL
+ """
+ if self.options["profileUrl"] != value:
+ self.options["profileUrl"] = value
+ self.fireEvent('dataChanged')
+
def isBot(self):
"""
Returns weather the participant is a bot.
@@ -111,6 +141,16 @@ def isBot(self):
"""
return self.options["isBot"]
+ def setBot(self, value):
+ """
+ Sets weather the participant is a bot.
+ @function {public} setBot
+ @param {String} value True, if this participant is a bot
+ """
+ if self.options["isBot"] != value:
+ self.options["isBot"] = value
+ self.fireEvent('dataChanged')
+
def isOnline(self):
"""
Returns weather the participant is a online.
@@ -568,6 +608,19 @@ def isRoot(self):
"""
return self.options["is_root"]
+ def elementById(self, id):
+ """
+ Returns an Element by its id.
+ @function {public Element} elementById
+ @param {int} id ID of the element
+ """
+ for i in xrange(len(self._elements)):
+ elt = self._elements[i]
+ if elt.id() == id:
+ return elt
+
+ return None
+
def elementAt(self, index):
"""
Returns the Element object at the given position or null.
@@ -809,6 +862,18 @@ class Wavelet(object):
@event onStatusChange
@param {String} status The new status; can be 'clean', 'dirty' or 'invalid'
"""
+
+ """
+ Fired when the Wavelet's title changed.
+ @event onTitleChanged
+ @param {String} title The Wavelet's new title
+ """
+
+ """
+ Fired when the Wavelet's last modification date/time changed.
+ @event onLastModifiedChanged
+ @param {Date} datetime New date of last modification
+ """
# ---------------------------
def __init__(self, wave, id, options):
@@ -840,6 +905,23 @@ def __init__(self, wave, id, options):
self._blips = []
self._rootBlip = None
+ def title(self):
+ """
+ Returns the title of this Wavelet
+ @function {public String} title
+ """
+ return self.options["title"]
+
+ def setTitle(self, title):
+ """
+ Sets the title of this Wavelet
+ @function {public} setTitle
+ @param {String} title The new title
+ """
+ if self.options["title"] != title:
+ self.options["title"] = title
+ self.fireEvent('titleChanged', title)
+
def isRoot(self):
"""
Returns true, if this Wavelet is the Wave's root Wavelet.
@@ -868,6 +950,13 @@ def waveModel(self):
"""
return self._wave
+ def participantCount(self):
+ """
+ Returns the number of participants on this Wavelet.
+ @function {public int} participantCount
+ """
+ return self._participants.getLength()
+
def addParticipant(self, participant):
"""
Add a participant to this Wavelet.<br/>
@@ -899,7 +988,10 @@ def participant(self, id):
@function {Participant} participant
@param {String} id ID of the Participant
"""
- return self._participants.get(id)
+ if self._participants.has(id):
+ return self._participants.get(id)
+ else:
+ return None
def allParticipants(self):
"""
@@ -984,6 +1076,26 @@ def blipById(self, id):
return None
+ def allBlips(self):
+ """
+ Returns a list of all Blips on this Wavelet, starting with the root Blip.
+
+ @function {Blip[]} allBlips
+ """
+ return self._blips
+
+ def allBlipIDs(self):
+ """
+ Returns a list of all IDs of the Blips on this Wavelet, starting with
+ the root Blip.
+
+ @function {public String[]} allBlipIDs
+ """
+ ids = []
+ for blip in self._blips:
+ ids.append(blip.id())
+ return ids
+
def _setRootBlip(self, blip):
"""
Internal method to set the root Blip. Not intended to be called
@@ -1007,6 +1119,37 @@ def _setStatus(self, status):
self.options["status"] = status
self.fireEvent("statusChange", status)
+ def status(self):
+ """
+ Returns the Wavelet's current status. Can be "clean", "dirty" or "invalid".
+ @function {public String} status
+ """
+ return self.options["status"]
+
+ def created(self):
+ """
+ Returns the creation date/time of this Wavelet.
+ @function {public Date} created
+ """
+ return self.options["created"]
+
+ def lastModified(self):
+ """
+ Returns the date/time of the last modification of this Wavelet.
+ @function {public Date} lastModified
+ """
+ return self.options["last_modified"]
+
+ def setLastModified(self, value):
+ """
+ Sets the date/time of the last modification of this Wavelet.
+ @function {public} setLastModified
+ @param {Date} value The new date/time of the last modification
+ """
+ if value != self.options["last_modified"]:
+ self.options["last_modified"] = value
+ self.fireEvent('lastModifiedChanged', value)
+
def checkSync(self, blipsums):
"""
Calculate and compare checksums of all Blips to the given map.
@@ -1027,7 +1170,7 @@ def checkSync(self, blipsums):
else:
self._setStatus("invalid")
- def applyOperations(self, ops):
+ def applyOperations(self, ops, participants):
"""
Apply the operations on the wavelet.
@@ -1050,6 +1193,37 @@ def applyOperations(self, ops):
blip.applyElementDelta(op.index, op.property)
elif op.type == pygowave.operations.DOCUMENT_ELEMENT_SETPREF:
blip.setElementUserpref(op.index, op.property["key"], op.property["value"])
+ else:
+ if op.type == pygowave.operations.WAVELET_ADD_PARTICIPANT:
+ self.addParticipant(participants[op.property])
+ elif op.type == pygowave.operations.WAVELET_REMOVE_PARTICIPANT:
+ self.removeParticipant(op.property)
+
+ def loadBlipsFromSnapshot(self, blips, rootBlipId, participants):
+ """
+ Load the blips from a snapshot.
+ @function {public} loadBlipsFromSnapshot
+ @param {Object} blips The JSON-serialized snapshot to load
+ @param {String} rootBlipId ID to identify the root Blip
+ @param {Hash} participants A map of participant objects
+ """
+
+ for blip_id, blip in blips.iteritems():
+ #TODO: handle Blip contributors
+ blip_options = {
+ "creator": participants[blip["creator"]],
+ "is_root": blip_id == rootBlipId,
+ "last_modified": blip["lastModifiedTime"],
+ "version": blip["version"],
+ "submitted": blip["submitted"]
+ }
+ blip_elements = []
+ for serialelement in blip["elements"]:
+ if serialelement["type"] == ELEMENT_TYPE["GADGET"]:
+ blip_elements.append(GadgetElement(None, serialelement["id"], serialelement["index"], serialelement["properties"]))
+ else:
+ blip_elements.append(Element(None, serialelement["id"], serialelement["index"], serialelement["type"], serialelement["properties"]))
+ blipObj = self.appendBlip(blip_id, blip_options, blip["content"], blip_elements)
@Implements(Events)
@Class
@@ -1067,6 +1241,12 @@ class WaveModel(object):
@param {String} waveletId ID of the Wavelet that has been added
@param {Boolean} isRoot True if this is the (new) root Wavelet
"""
+
+ """
+ Fired before a wavelet is removed.
+ @event onWaveletAboutToBeRemoved
+ @param {String} waveletId ID of the Wavelet that will be removed
+ """
# ---------------------------
def __init__(self, waveId, viewerId):
@@ -1123,22 +1303,7 @@ def loadFromSnapshot(self, obj, participants):
for part_id in rootWavelet["participants"]:
rootWaveletObj.addParticipant(participants[part_id])
- for blip_id, blip in obj["blips"].iteritems():
- #TODO: handle Blip contributors
- blip_options = {
- "creator": participants[blip["creator"]],
- "is_root": blip["blipId"] == rootWavelet["rootBlipId"],
- "last_modified": blip["lastModifiedTime"],
- "version": blip["version"],
- "submitted": blip["submitted"]
- }
- blip_elements = []
- for serialelement in blip["elements"]:
- if serialelement["type"] == ELEMENT_TYPE["GADGET"]:
- blip_elements.append(GadgetElement(None, serialelement["id"], serialelement["index"], serialelement["properties"]))
- else:
- blip_elements.append(Element(None, serialelement["id"], serialelement["index"], serialelement["type"], serialelement["properties"]))
- blipObj = rootWaveletObj.appendBlip(blip_id, blip_options, blip["content"], blip_elements)
+ rootWaveletObj.loadBlipsFromSnapshot(obj["blips"], rootWavelet["rootBlipId"], participants);
def createWavelet(self, id, options):
"""
@@ -1164,6 +1329,14 @@ def wavelet(self, waveletId):
"""
return self._wavelets.get(waveletId)
+ def allWavelets(self):
+ """
+ Return a list of all Wavelets on this Wave.
+
+ @function {public Wavelet[]} allWavelets
+ """
+ return self._wavelets.getValues()
+
def rootWavelet(self):
"""
Returns the root Wavelet of this Wave.
@@ -1181,3 +1354,20 @@ def _setRootWavelet(self, wavelet):
@param {Wavelet} wavelet Wavelet to be set as root Wavelet
"""
self._rootWavelet = wavelet
+
+ def removeWavelet(self, waveletId):
+ """
+ Removes and deletes a wavelet by its id. Fires
+ {@link pygowave.model.WaveModel.onWaveletAboutToBeRemoved} beforehand.
+
+ @function {public} removeWavelet
+ @param {String} waveletId ID of the Wavelet to remove
+ """
+ if not self._wavelets.has(waveletId):
+ return
+
+ self.fireEvent('waveletAboutToBeRemoved', waveletId)
+ wavelet = self._wavelets.get(waveletId)
+ self._wavelets.erase(waveletId)
+ if wavelet == self._rootWavelet:
+ self._rootWavelet = None
View
99 pygowave_client/src/operations/operations.py
@@ -30,15 +30,13 @@
DOCUMENT_DELETE = 'DOCUMENT_DELETE'
DOCUMENT_ELEMENT_INSERT = 'DOCUMENT_ELEMENT_INSERT'
DOCUMENT_ELEMENT_DELETE = 'DOCUMENT_ELEMENT_DELETE'
+WAVELET_ADD_PARTICIPANT = 'WAVELET_ADD_PARTICIPANT'
+WAVELET_REMOVE_PARTICIPANT = 'WAVELET_REMOVE_PARTICIPANT'
# Currently supported, but non-official operations
DOCUMENT_ELEMENT_DELTA = 'DOCUMENT_ELEMENT_DELTA'
DOCUMENT_ELEMENT_SETPREF = 'DOCUMENT_ELEMENT_SETPREF'
-# Currently supported operations, which are not handled within OT
-#WAVELET_ADD_PARTICIPANT = 'WAVELET_ADD_PARTICIPANT'
-#WAVELET_REMOVE_SELF = 'WAVELET_REMOVE_SELF'
-
# Currently not supported operations
#WAVELET_APPEND_BLIP = 'WAVELET_APPEND_BLIP'
#WAVELET_CREATE = 'WAVELET_CREATE'
@@ -69,6 +67,8 @@
"DOCUMENT_ELEMENT_DELETE",
"DOCUMENT_ELEMENT_DELTA",
"DOCUMENT_ELEMENT_SETPREF",
+ "WAVELET_ADD_PARTICIPANT",
+ "WAVELET_REMOVE_PARTICIPANT",
]
@Class
@@ -377,9 +377,7 @@ def transform(self, input_op):
self.fireEvent("operationChanged", i)
else: # end >= myop.index + myop.length()
op.resize(op.length() - myop.length())
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
break
else: # op.index >= myop.index
@@ -391,9 +389,7 @@ def transform(self, input_op):
op_lst.pop(j)
j -= 1
if myop.isNull():
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
break
else:
@@ -430,9 +426,7 @@ def transform(self, input_op):
myop.resize(op.index - myop.index)
self.fireEvent("operationChanged", i)
new_op.resize(new_op.length() - myop.length())
- self.fireEvent("beforeOperationsInserted", [i+1, i+1])
- self.operations.insert(i+1, new_op)
- self.fireEvent("afterOperationsInserted", [i+1, i+1])
+ self.insertOperation(i+1, new_op)
op.index = myop.index
elif op.isInsert() and myop.isInsert():
@@ -462,6 +456,12 @@ def transform(self, input_op):
if op.index <= myop.index:
myop.index += op.length()
self.fireEvent("operationChanged", i)
+ elif (op.type == WAVELET_ADD_PARTICIPANT and myop.type == WAVELET_ADD_PARTICIPANT) \
+ or (op.type == WAVELET_REMOVE_PARTICIPANT and myop.type == WAVELET_REMOVE_PARTICIPANT):
+ if op.property == myop.property:
+ self.removeOperation(i)
+ i -= 1
+ break
j += 1
@@ -571,9 +571,7 @@ def __insert(self, newop):
op.deleteString(newop.index - op.index, remain)
newop.resize(newop.length() - remain)
if op.isNull():
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
else:
self.fireEvent("operationChanged", i)
@@ -582,9 +580,7 @@ def __insert(self, newop):
elif newop.index < op.index and newop.index+newop.length() > op.index:
if newop.index+newop.length() >= op.index+op.length():
newop.resize(newop.length() - op.length())
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
else:
dlength = newop.index+newop.length() - op.index
@@ -601,11 +597,42 @@ def __insert(self, newop):
op.resize(op.length() + newop.length())
self.fireEvent("operationChanged", i)
return
+ elif (newop.type == WAVELET_ADD_PARTICIPANT and op.type == WAVELET_ADD_PARTICIPANT) \
+ or (newop.type == WAVELET_REMOVE_PARTICIPANT and op.type == WAVELET_REMOVE_PARTICIPANT):
+ if newop.property == op.property:
+ return
# If we reach this the operation could not be merged, so add it.
- self.fireEvent("beforeOperationsInserted", [i+1, i+1])
- self.operations.append(newop)
- self.fireEvent("afterOperationsInserted", [i+1, i+1])
+ self.insertOperation(i+1, newop)
+
+ def insertOperation(self, index, op):
+ """
+ Inserts an operation at the specified index.
+ Fires signals appropriately.
+
+ @function {public} insertOperation
+ @param {int} index Position in operations list
+ @param {Operation} op Operation object to insert
+ """
+ if index > len(self.operations) or index < 0:
+ return
+ self.fireEvent("beforeOperationsInserted", [index, index])
+ self.operations.insert(index, op)
+ self.fireEvent("afterOperationsInserted", [index, index])
+
+ def removeOperation(self, index):
+ """
+ Removes an operation at the specified index.
+ Fires signals appropriately.
+
+ @function {public} removeOperation
+ @param {int} index Position in operations list
+ """
+ if index < 0 or index >= len(self.operations):
+ return
+ self.fireEvent("beforeOperationsRemoved", [index, index])
+ self.operations.pop(index)
+ self.fireEvent("afterOperationsRemoved", [index, index])
# --------------------------------------------------------------------------
@@ -711,3 +738,31 @@ def documentElementSetpref(self, blipId, index, key, value):
"value": value
}
))
+
+ def waveletAddParticipant(self, id):
+ """
+ Requests to add a Participant to the wavelet.
+
+ @function {public} waveletAddParticipant
+ @param {String} id ID of the Participant to add
+ """
+ self.__insert(Operation(
+ WAVELET_ADD_PARTICIPANT,
+ self.waveId, self.waveletId, "",
+ -1,
+ id
+ ))
+
+ def waveletRemoveParticipant(self, id):
+ """
+ Requests to remove a Participant to the wavelet.
+
+ @function {public} waveletRemoveParticipant
+ @param {String} id ID of the Participant to remove
+ """
+ self.__insert(Operation(
+ WAVELET_REMOVE_PARTICIPANT,
+ self.waveId, self.waveletId, "",
+ -1,
+ id
+ ))
View
376 pygowave_rpc/c2s_mp.py
@@ -20,11 +20,12 @@
import sys, datetime, logging
+from django.contrib import auth
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
-from pygowave_server.models import Participant, ParticipantConn, Gadget, GadgetElement, Delta
-from pygowave_server.common.operations import OpManager
+from pygowave_server.models import Participant, ParticipantConn, Wave, Gadget, GadgetElement, Delta
+from pygowave_server.common.operations import OpManager, WAVELET_REMOVE_PARTICIPANT, WAVELET_ADD_PARTICIPANT
__all__ = ["PyGoWaveClientMessageProcessor"]
@@ -42,9 +43,9 @@
# PARTICIPANT_INFO (sync)
# PARTICIPANT_SEARCH (sync)
# GADGET_LIST (sync)
-# WAVELET_ADD_PARTICIPANT (sync)
-# WAVELET_REMOVE_SELF (sync)
# OPERATION_MESSAGE_BUNDLE (OT)
+# WAVELET_ADD_PARTICIPANT
+# WAVELET_REMOVE_SELF
# DOCUMENT_INSERT
# DOCUMENT_DELETE
# DOCUMENT_ELEMENT_INSERT
@@ -68,13 +69,21 @@ class PyGoWaveClientMessageProcessor(object):
not subscribed to. This is intended and it's RabbitMQ's job to drop the
message. This keeps things simple for now and may be changed in the future.
+ The guid is generated by the uuid library and should be unique for every
+ connection. In practice any string could be used up to 42 characters.
+
Message comsumers can be optimized in a multi-threading environment. If
one consumer starts handling messages of a particular wavelet, it should
block others from handling them.
Some messages are handled synchronously (i.e. the client does not perform
- any actions and waits for the server's response). Those are in particular:
- WAVELET_ADD_PARTICIPANT
+ any actions and waits for the server's response). See the list above.
+
+ Special routing keys:
+ - <participant_conn_guid>.manager.[waveop|clientop]
+ Used for special non-wavelet messages
+ - <participant_conn_guid>.login.[waveop|clientop]
+ Used for authentication via client protocol
"""
@@ -93,42 +102,68 @@ def process(self, routing_key, message_data):
if message_category != "clientop":
return {}
- self.logger.debug("Received Message from %s.%s.%s:\n%s" % (participant_conn_key, wavelet_id, message_category, repr(message_data)))
+ if wavelet_id != "login":
+ self.logger.debug("Received Message from %s.%s.%s:\n%s" % (participant_conn_key, wavelet_id, message_category, repr(message_data)))
+ else:
+ self.logger.debug("Received Login Message from %s" % (participant_conn_key))
- # Get participant connection
- try:
- pconn = ParticipantConn.objects.get(tx_key=participant_conn_key)
- except ObjectDoesNotExist:
- self.logger.error("{%s} ParticipantConn not found" % (routing_key))
- return {} # Fail silently
+ if not isinstance(message_data, list):
+ message_data = [message_data]
- # Get wavelet
- try:
- wavelet = pconn.participant.wavelets.get(id=wavelet_id)
- except ObjectDoesNotExist:
- self.logger.error("{%s} Wavelet not found (or not participating)" % (routing_key))
- return {} # Fail silently
+ # Get participant connection
+ if wavelet_id != "login":
+ try:
+ pconn = ParticipantConn.objects.get(tx_key=participant_conn_key)
+ except ObjectDoesNotExist:
+ return self.reply_error(participant_conn_key, wavelet_id, "NO_CONNECTION", "Not logged in or disconnected by server")
- # Handle message and reply to sender and/or broadcast an event
self.out_queue = {}
- if isinstance(message_data, list): # multi-message?
+ self.manager_queue = {}
+
+ if wavelet_id == "login": # Login message?
+
+ pconn = ParticipantConn(rx_key=participant_conn_key, tx_key=participant_conn_key)
for sub_message in message_data:
try:
- if not self.handle_participant_message(wavelet, pconn, sub_message): break
+ self.handle_login_message(pconn, sub_message)
+ except:
+ self.logger.exception("{%s} Exception in login message handler" % (routing_key))
+ self.error(pconn, "EXCEPTION_IN_HANDLER", "The login message handler has thrown an exeption")
+ break
+
+ elif wavelet_id == "manager": # Management message?
+
+ for sub_message in message_data:
+ try:
+ if not self.handle_management_message(pconn, sub_message): break
except:
- self.logger.exception("{%s} Exception in message handler" % (routing_key))
- break
+ self.logger.exception("{%s} Exception in management message handler" % (routing_key))
+ self.error(pconn, "EXCEPTION_IN_HANDLER", "The management message handler has thrown an exeption")
else:
+ # Get wavelet
try:
- self.handle_participant_message(wavelet, pconn, message_data)
- except:
- self.logger.exception("{%s} Exception in message handler" % (routing_key))
+ wavelet = pconn.participant.wavelets.get(id=wavelet_id)
+ except ObjectDoesNotExist:
+ self.logger.error("{%s} Wavelet not found (or not participating)" % (routing_key))
+ return self.reply_error(participant_conn_key, wavelet_id, "WAVELET_NOT_AVAILABLE", "Wavelet not found or not participating")
+
+ # Handle message and reply to sender and/or broadcast an event
+ for sub_message in message_data:
+ try:
+ if not self.handle_participant_message(wavelet, pconn, sub_message): break
+ except:
+ self.logger.exception("{%s} Exception in participant message handler" % (routing_key))
+ self.error(pconn, "EXCEPTION_IN_HANDLER", "The participant message handler has thrown an exeption")
- # Create message dictionary
+ # Build message dictionary
msg_dict = {}
for receiver, messages in self.out_queue.iteritems():
msg_dict["%s.%s.waveop" % (receiver, wavelet_id)] = messages
+ # Special manager messages
+ for receiver, messages in self.manager_queue.iteritems():
+ msg_dict["%s.manager.waveop" % (receiver)] = messages
self.out_queue = {}
+ self.manager_queue = {}
return msg_dict
@@ -145,11 +180,11 @@ def handle_participant_message(self, wavelet, pconn, message):
if message.has_key(u"type"):
- if message["type"] == "PING":
+ if message["type"] == "PING": # DEPRECATED: Use the manager!
self.emit(pconn, "PONG", message["property"]) # Traditionally
elif message["type"] == "WAVELET_OPEN":
- self.logger.info("[%s/%d@%s] Opening wavelet" % (participant.name, pconn.id, wavelet.wave.id))
+ self.logger.info("[%s/%s@%s] Opening wavelet" % (participant.name, pconn.id, wavelet.wave.id))
pconn.wavelets.add(wavelet)
# I know this is neat :)
self.emit(pconn, "WAVELET_OPEN", {
@@ -157,8 +192,14 @@ def handle_participant_message(self, wavelet, pconn, message):
"blips": wavelet.serialize_blips(),
})
- elif message["type"] == "PARTICIPANT_INFO":
- self.logger.info("[%s/%d@%s] Sending participant information" % (participant.name, pconn.id, wavelet.wave.id))
+ elif message["type"] == "WAVELET_CLOSE":
+ # No reply
+ pconn.wavelets.remove(wavelet)
+ self.logger.info("[%s/%s@%s] Connection to wavelet closed (by client)" % (participant.name, pconn.id, wavelet.wave.id))
+ return False
+
+ elif message["type"] == "PARTICIPANT_INFO": # DEPRECATED: Moved to manager!
+ self.logger.info("[%s/%s@%s] Sending participant information" % (participant.name, pconn.id, wavelet.wave.id))
p_info = {}
for p_id in message["property"]:
try:
@@ -167,45 +208,48 @@ def handle_participant_message(self, wavelet, pconn, message):
p_info[p_id] = None
self.emit(pconn, "PARTICIPANT_INFO", p_info)
- elif message["type"] == "PARTICIPANT_SEARCH":
+ elif message["type"] == "PARTICIPANT_SEARCH": # DEPRECATED: Moved to manager!
if len(message["property"]) < getattr(settings, "PARTICIPANT_SEARCH_LENGTH", 0):
self.emit(pconn, "PARTICIPANT_SEARCH", {"result": "TOO_SHORT", "data": getattr(settings, "PARTICIPANT_SEARCH_LENGTH", 0)})
- self.logger.debug("[%s/%d@%s] Participant search query too short" % (participant.name, pconn.id, wavelet.wave.id))
+ self.logger.debug("[%s/%s@%s] Participant search query too short" % (participant.name, pconn.id, wavelet.wave.id))
else:
- self.logger.info("[%s/%d@%s] Performing participant search" % (participant.name, pconn.id, wavelet.wave.id))
+ self.logger.info("[%s/%s@%s] Performing participant search" % (participant.name, pconn.id, wavelet.wave.id))
lst = []
for p in Participant.objects.filter(name__icontains=message["property"]).exclude(id=participant.id):
lst.append(p.id)
self.emit(pconn, "PARTICIPANT_SEARCH", {"result": "OK", "data": lst})
- elif message["type"] == "GADGET_LIST":
+ elif message["type"] == "GADGET_LIST": # DEPRECATED: Moved to manager!
all_gadgets = map(lambda g: {"id": g.id, "uploaded_by": g.by_user.participants.all()[0].name, "name": g.title, "descr": g.description, "url": g.url}, Gadget.objects.all())
- self.logger.info("[%s/%d@%s] Sending Gadget list" % (participant.name, pconn.id, wavelet.wave.id))
+ self.logger.info("[%s/%s@%s] Sending Gadget list" % (participant.name, pconn.id, wavelet.wave.id))
self.emit(pconn, "GADGET_LIST", all_gadgets)
- elif message["type"] == "WAVELET_ADD_PARTICIPANT":
+ elif message["type"] == "WAVELET_ADD_PARTICIPANT": # DEPRECATED: Moved to OPERATION_MESSAGE_BUNDLE
# Find participant
try:
p = Participant.objects.get(id=message["property"])
except ObjectDoesNotExist:
- self.logger.error("[%s/%d@%s] Target participant '%s' not found" % (participant.name, pconn.id, wavelet.wave.id, message["property"]))
- return True # Fail silently (TODO: report error to user)
+ self.emit(pconn, "ERROR", {"tag": "PARTICIPANT_NOT_FOUND", "desc": "A participant with id '%s' does not exist" % message["property"]})
+ self.logger.error("[%s/%s@%s] Target participant '%s' not found" % (participant.name, pconn.id, wavelet.wave.id, message["property"]))
+ return True
# Check if already participating
if wavelet.participants.filter(id=message["property"]).count() > 0:
- self.logger.error("[%s/%d@%s] Target participant '%s' already there" % (participant.name, pconn.id, wavelet.wave.id, message["property"]))
- return True # Fail silently (TODO: report error to user)
+ self.emit(pconn, "ERROR", {"tag": "PARTICIPANT_ALREADY_IN", "desc": "The participant with id '%s' already takes part" % message["property"]})
+ self.logger.error("[%s/%s@%s] Target participant '%s' already there" % (participant.name, pconn.id, wavelet.wave.id, message["property"]))
+ return True
wavelet.participants.add(p)
- self.logger.info("[%s/%d@%s] Added new participant '%s'" % (participant.name, pconn.id, wavelet.wave.id, message["property"]))
- self.broadcast(wavelet, "WAVELET_ADD_PARTICIPANT", message["property"])
+ self.logger.info("[%s/%s@%s] Added new participant '%s'" % (participant.name, pconn.id, wavelet.wave.id, message["property"]))
+ self.broadcast(wavelet, "WAVELET_ADD_PARTICIPANT", message["property"], [], {"id": message["property"]})
- elif message["type"] == "WAVELET_REMOVE_SELF":
- self.broadcast(wavelet, "WAVELET_REMOVE_PARTICIPANT", participant.id)
+ elif message["type"] == "WAVELET_REMOVE_SELF": # DEPRECATED: Moved to OPERATION_MESSAGE_BUNDLE
+ self.broadcast(wavelet, "WAVELET_REMOVE_PARTICIPANT", participant.id, [], {"id": participant.id})
wavelet.participants.remove(participant) # Bye bye
- pconn.wavelets.remove(wavelet) # Also for your connection
- self.logger.info("[%s/%d@%s] Participant removed himself" % (participant.name, pconn.id, wavelet.wave.id))
+ if pconn.wavelets.filter(id=wavelet.id).count() > 0:
+ pconn.wavelets.remove(wavelet) # Also for your connection
+ self.logger.info("[%s/%s@%s] Participant removed himself" % (participant.name, pconn.id, wavelet.wave.id))
if wavelet.participants.count() == 0: # Oh my god, you killed the Wave! You bastard!
- self.logger.info("[%s/%d@%s] Wave got killed!" % (participant.name, pconn.id, wavelet.wave.id))
+ self.logger.info("[%s/%s@%s] Wave got killed!" % (participant.name, pconn.id, wavelet.wave.id))
wavelet.wave.delete()
return False
@@ -220,9 +264,27 @@ def handle_participant_message(self, wavelet, pconn, message):
for op in delta.getOpManager().operations:
newdelta.transform(op) # Trash results (an existing delta cannot be changed)
+ i = 0
+ added = []
+ while i < len(newdelta.operations):
+ op = newdelta.operations[i]
+ if op.type == WAVELET_REMOVE_PARTICIPANT: # Check self removal, prevent removal of others
+ if op.property != participant.id:
+ self.logger.error("[%s/%s@%s] Participant tried to remove '%s'" % (participant.name, pconn.id, wavelet.wave.id, op.property))
+ newdelta.removeOperation(i)
+ continue
+ self.broadcast_managers(wavelet, "WAVELET_REMOVE_PARTICIPANT", {"id": op.property}, [pconn], False)
+ elif op.type == WAVELET_ADD_PARTICIPANT:
+ added.append(op.property)
+ i += 1
+
# Apply
wavelet.applyOperations(newdelta.operations)
+ # Send manager messages for added participants
+ for id in added:
+ self.broadcast_managers(wavelet, "WAVELET_ADD_PARTICIPANT", {"id": id}, [pconn])
+
# Raise version and store
wavelet.version += 1
wavelet.save()
@@ -236,17 +298,153 @@ def handle_participant_message(self, wavelet, pconn, message):
self.emit(pconn, "OPERATION_MESSAGE_BUNDLE_ACK", {"version": wavelet.version, "blipsums": blipsums})
self.broadcast(wavelet, "OPERATION_MESSAGE_BUNDLE", {"version": wavelet.version, "operations": newdelta.serialize(), "blipsums": blipsums}, [pconn])
- self.logger.debug("[%s/%d@%s] Processed delta #%d -> v%d" % (participant.name, pconn.id, wavelet.wave.id, version, wavelet.version))
+ self.logger.debug("[%s/%s@%s] Processed delta #%d -> v%d" % (participant.name, pconn.id, wavelet.wave.id, version, wavelet.version))
+
+ if wavelet.participants.count() == 0: # Oh my god, you killed the Wave! You bastard!
+ self.logger.info("[%s/%s@%s] Wave got killed!" % (participant.name, pconn.id, wavelet.wave.id))
+ wavelet.wave.delete()
else:
- self.logger.error("[%s/%d@%s] Unknown message: %s" % (participant.name, pconn.id, wavelet.wave.id, message))
+ self.emit(pconn, "ERROR", {"tag": "UNKNOWN_MESSAGE", "desc": "Type '%s' not recognised" % message["type"]})
+ self.logger.error("[%s/%s@%s] Unknown message: %s" % (participant.name, pconn.id, wavelet.wave.id, message))
else:
- self.logger.error("[%s/%d@%s] Unknown message: %s" % (participant.name, pconn.id, wavelet.wave.id, message))
+ self.emit(pconn, "ERROR", {"tag": "UNKNOWN_MESSAGE", "desc": "Message lacks 'type' field"})
+ self.logger.error("[%s/%s@%s] Unknown message: %s" % (participant.name, pconn.id, wavelet.wave.id, message))
return True
- def broadcast(self, wavelet, type, property, except_connections=[]):
+ def handle_management_message(self, pconn, message):
+ """
+ Handle a non-wavelet message.
+ If True is returned, go on with processing the next message.
+ If False is returned, discard any following messages.
+
+ """
+ participant = pconn.participant
+ pconn.last_contact = datetime.datetime.now()
+ pconn.save()
+
+ if message.has_key(u"type"):
+
+ if message["type"] == "PING":
+ self.emit(pconn, "PONG", message["property"]) # Traditionally
+
+ elif message["type"] == "PARTICIPANT_INFO":
+ self.logger.info("[%s/%s@manager] Sending participant information" % (participant.name, pconn.id))
+ p_info = {}
+ for p_id in message["property"]:
+ try:
+ p_info[p_id] = Participant.objects.get(id=p_id).serialize()
+ except ObjectDoesNotExist:
+ p_info[p_id] = None
+ self.emit(pconn, "PARTICIPANT_INFO", p_info)
+
+ elif message["type"] == "PARTICIPANT_SEARCH":
+ if len(message["property"]) < getattr(settings, "PARTICIPANT_SEARCH_LENGTH", 0):
+ self.emit(pconn, "PARTICIPANT_SEARCH", {"result": "TOO_SHORT", "data": getattr(settings, "PARTICIPANT_SEARCH_LENGTH", 0)})
+ self.logger.debug("[%s/%s@manager] Participant search query too short" % (participant.name, pconn.id))
+ else:
+ self.logger.info("[%s/%s@manager] Performing participant search" % (participant.name, pconn.id))
+
+ lst = []
+ for p in Participant.objects.filter(name__icontains=message["property"]).exclude(id=participant.id):
+ lst.append(p.id)
+ self.emit(pconn, "PARTICIPANT_SEARCH", {"result": "OK", "data": lst})
+
+ elif message["type"] == "GADGET_LIST":
+ all_gadgets = map(lambda g: {"id": g.id, "uploaded_by": g.by_user.participants.all()[0].name, "name": g.title, "descr": g.description, "url": g.url}, Gadget.objects.all())
+ self.logger.info("[%s/%s@manager] Sending Gadget list" % (participant.name, pconn.id))
+ self.emit(pconn, "GADGET_LIST", all_gadgets)
+
+ elif message["type"] == "WAVELET_CREATE":
+ if message["property"].get("waveId", "") == "":
+ wave = Wave.objects.create_and_init_new_wave(participant, message["property"].get("title", ""))
+ self.logger.info("[%s/%s@manager] Created wave '%s'" % (participant.name, pconn.id, wave.id))
+ wavelet = wave.root_wavelet()
+ else:
+ try:
+ wave = Wave.objects.get(id=message["property"]["waveId"])
+ except ObjectDoesNotExist:
+ self.emit(pconn, "ERROR", {"tag": "WAVE_NOT_FOUND", "desc": "A Wave with id '%s' does not exist" % message["property"]["waveId"]})
+ self.logger.error("[%s/%s@manager] Cannot create Wavelet; Wave '%s' not found" % (participant.name, pconn.id, message["property"]["waveId"]))
+ return True
+ wavelet = wave.create_wavelet(participant, message["property"].get("title", ""))
+ self.emit(pconn, "WAVELET_CREATED", {"waveId": wave.id, "waveletId": wavelet.id})
+ self.logger.info("[%s/%s@manager] Created Wavelet '%s'" % (participant.name, pconn.id, wavelet.id))
+
+ elif message["type"] == "WAVE_LIST":
+ self.logger.info("[%s/%s@manager] Sending Wave list" % (participant.name, pconn.id))
+ waves = {}
+ for wavelet in participant.wavelets.all():
+ entry = waves.setdefault(wavelet.wave.id, {})
+ entry[wavelet.id] = wavelet.serialize()
+ self.emit(pconn, "WAVE_LIST", waves)
+
+ elif message["type"] == "WAVELET_LIST":
+ wave_id = message["property"].get("waveId", "")
+ try:
+ wave = Wave.objects.get(id=wave_id)
+ except ObjectDoesNotExist:
+ self.emit(pconn, "ERROR", {"tag": "WAVE_NOT_FOUND", "desc": "A Wave with id '%s' does not exist" % wave_id})
+ self.logger.error("[%s/%s@manager] Cannot list Wavelets; Wave '%s' not found" % (participant.name, pconn.id, wave_id))
+ return True
+ self.logger.info("[%s/%s@manager] Sending Wavelet list for wave '%s'" % (participant.name, pconn.id, wave_id))
+ wavelets = {}
+ for wavelet in participant.wavelets.filter(wave=wave):
+ wavelets[wavelet.id] = wavelet.serialize()
+ self.emit(pconn, "WAVELET_LIST", {"waveId": wave_id, "wavelets": wavelets})
+
+ elif message["type"] == "DISCONNECT":
+ self.logger.info("[%s/%s@manager] Connection to server closed (by client)" % (participant.name, pconn.id))
+ pconn.delete()
+ return False
+
+ else:
+ self.emit(pconn, "ERROR", {"tag": "UNKNOWN_MESSAGE", "desc": "Type '%s' not recognised" % message["type"]})
+ self.logger.error("[%s/%s@manager] Unknown message: %s" % (participant.name, pconn.id, message))
+
+ else:
+ self.emit(pconn, "ERROR", {"tag": "UNKNOWN_MESSAGE", "desc": "Message lacks 'type' field"})
+ self.logger.error("[%s/%s@manager] Unknown message: %s" % (participant.name, pconn.id, message))
+
+ return True
+
+ def handle_login_message(self, pconn, message):
+ """
+ Handle a login message.
+
+ pconn is a temporary object which is used to send a reply.
+
+ """
+
+ if message.has_key(u"type"):
+
+ if message["type"] == "LOGIN":
+ user = auth.authenticate(username=message["property"].get("username", ""), password=message["property"].get("password", ""))
+ if user != None:
+ try:
+ participant = Participant.objects.get(user__id=user.id)
+ except ObjectDoesNotExist:
+ participant = Participant.objects.create_from_user(user)
+ # Welcome
+ new_pconn = participant.create_new_connection()
+ new_pconn.save()
+ self.logger.info("[%s/%s@login] Participant logged in via key '%s'" % (participant.name, new_pconn.id, pconn.rx_key))
+ self.emit(pconn, "LOGIN", {"rx_key": new_pconn.rx_key, "tx_key": new_pconn.tx_key, "viewer_id": participant.id})
+ else:
+ self.emit(pconn, "ERROR", {"tag": "LOGIN_FAILED", "desc": "Unknown username or password"})
+ self.logger.error("[%s@login] Login failed" % (pconn.rx_key))
+
+ else:
+ self.emit(pconn, "ERROR", {"tag": "UNKNOWN_MESSAGE", "desc": "Type '%s' not recognised" % message["type"]})
+ self.logger.error("[%s@login] Unknown message: %s" % (pconn.rx_key, message))
+
+ else:
+ self.emit(pconn, "ERROR", {"tag": "UNKNOWN_MESSAGE", "desc": "Message lacks 'type' field"})
+ self.logger.error("[%s@login] Unknown message: %s" % (pconn.rx_key, message))
+
+ def broadcast(self, wavelet, type, property, except_connections=[], manager_property=None):
"""
Send messages to all participants.
@@ -259,16 +457,35 @@ def broadcast(self, wavelet, type, property, except_connections=[]):
"type": type,
"property": property
}
+ if manager_property:
+ mmsg_dict = {
+ "type": type,
+ "property": manager_property
+ }
+ mmsg_dict["property"].update({"waveId": wavelet.wave.id, "waveletId": wavelet.id})
self.logger.debug("Broadcasting Message:\n" + repr(msg_dict))
for p in wavelet.participants.all():
for conn in p.connections.all():
if not conn in except_connections:
- if self.out_queue.has_key(conn.rx_key):
- self.out_queue[conn.rx_key].append(msg_dict)
- else:
- self.out_queue[conn.rx_key] = [msg_dict]
+ if conn.wavelets.filter(id=wavelet.id).count() > 0: # Connected to this wavelet
+ self.out_queue.setdefault(conn.rx_key, []).append(msg_dict)
+ elif manager_property != None: # Not connected, send manager message
+ self.manager_queue.setdefault(conn.rx_key, []).append(mmsg_dict)
- def emit(self, to, type, property, except_connections=[]):
+ def broadcast_managers(self, wavelet, type, manager_property, except_connections=[], only_unconnected=True):
+ mmsg_dict = {
+ "type": type,
+ "property": manager_property
+ }
+ mmsg_dict["property"].update({"waveId": wavelet.wave.id, "waveletId": wavelet.id})
+ self.logger.debug("Broadcasting Manager Message:\n" + repr(mmsg_dict))
+ for p in wavelet.participants.all():
+ for conn in p.connections.all():
+ if not conn in except_connections:
+ if conn.wavelets.filter(id=wavelet.id).count() == 0 or not only_unconnected:
+ self.manager_queue.setdefault(conn.rx_key, []).append(mmsg_dict)
+
+ def emit(self, to, type, property):
"""
Collect messages to be sent.
`to` must be a ParticipantConn object.
@@ -278,11 +495,40 @@ def emit(self, to, type, property, except_connections=[]):
"type": type,
"property": property
}
- self.logger.debug("Emiting Message to %s/%d:\n%s" % (to.participant.name, to.id, repr(msg_dict)))
- if self.out_queue.has_key(to.rx_key):
- self.out_queue[to.rx_key].append(msg_dict)
- else:
- self.out_queue[to.rx_key] = [msg_dict]
+ try:
+ self.logger.debug("Emiting Message to %s/%s:\n%s" % (to.participant.name, to.id, repr(msg_dict)))
+ except:
+ self.logger.debug("Emiting Message to %s:\n%s" % (to.rx_key, repr(msg_dict)))
+ self.out_queue.setdefault(to.rx_key, []).append(msg_dict)
+
+ def error(self, to, tag, desc):
+ """
+ Collect error messages. See emit.
+
+ """
+ msg_dict = {
+ "type": "ERROR",
+ "property": {
+ "tag": tag,
+ "desc": desc
+ }
+ }
+ self.out_queue.setdefault(to.rx_key, []).append(msg_dict)
+
+ def reply_error(self, conn_key, wavelet_id, tag, desc):
+ """
+ Builds an error message to be returned directly by the process method.
+
+ """
+ return {
+ "%s.%s.waveop" % (conn_key, wavelet_id): {
+ "type": "ERROR",
+ "property": {
+ "tag": tag,
+ "descr": desc
+ }
+ }
+ }
def purge_connections(self):
"""
@@ -296,6 +542,6 @@ def purge_connections(self):
conn_id, conn_participant_name = conn.id, conn.participant.name
for wavelet in conn.wavelets.all():
wavelet.participant_conns.remove(conn)
- self.logger.info("[%s/%d@%s] Connection to wavelet closed" % (conn.participant.name, conn.id, wavelet.wave.id))
+ self.logger.info("[%s/%s@%s] Connection to wavelet closed (by timeout)" % (conn.participant.name, conn.id, wavelet.wave.id))
conn.delete()
- self.logger.info("[%s/%d] Connection to server closed" % (conn_participant_name, conn_id))
+ self.logger.info("[%s/%s] Connection to server closed (by timeout)" % (conn_participant_name, conn_id))
View
8 pygowave_rpc/stomp_client.py
@@ -54,6 +54,7 @@ def connected(self, msg):
def ack(self, message):
rkey = message["headers"]["destination"]
+ self.pygo_mp.logger.info("Got "+message["body"])
message_data = anyjson.deserialize(message["body"])
msg_dict = self.pygo_mp.process(rkey, message_data)
@@ -83,6 +84,7 @@ def __init__(self):
def connectionMade(self):
"""Register with the stomp server."""
+ self.factory.connection = self
self.transport.write(self.mp.connect())
self.lc = LoopingCall(self.mp.pygo_mp.purge_connections)
self.lc.start(10 * 60) # Purge every 10 minutes
@@ -109,13 +111,11 @@ def dataReceived(self, data):
self.transport.write(returned)
class StompClientFactory(ReconnectingClientFactory):
+ protocol = StompClientProtocol
+
def startedConnecting(self, connector):
"""Started to connect."""
- def buildProtocol(self, addr):
- """Transport level connected now create the communication protocol."""
- return StompClientProtocol()
-
def clientConnectionLost(self, connector, reason):
"""Lost connection."""
View
99 pygowave_server/common/operations.py
@@ -30,15 +30,13 @@
DOCUMENT_DELETE = 'DOCUMENT_DELETE'
DOCUMENT_ELEMENT_INSERT = 'DOCUMENT_ELEMENT_INSERT'
DOCUMENT_ELEMENT_DELETE = 'DOCUMENT_ELEMENT_DELETE'
+WAVELET_ADD_PARTICIPANT = 'WAVELET_ADD_PARTICIPANT'
+WAVELET_REMOVE_PARTICIPANT = 'WAVELET_REMOVE_PARTICIPANT'
# Currently supported, but non-official operations
DOCUMENT_ELEMENT_DELTA = 'DOCUMENT_ELEMENT_DELTA'
DOCUMENT_ELEMENT_SETPREF = 'DOCUMENT_ELEMENT_SETPREF'
-# Currently supported operations, which are not handled within OT
-#WAVELET_ADD_PARTICIPANT = 'WAVELET_ADD_PARTICIPANT'
-#WAVELET_REMOVE_SELF = 'WAVELET_REMOVE_SELF'
-
# Currently not supported operations
#WAVELET_APPEND_BLIP = 'WAVELET_APPEND_BLIP'
#WAVELET_CREATE = 'WAVELET_CREATE'
@@ -69,6 +67,8 @@
"DOCUMENT_ELEMENT_DELETE",
"DOCUMENT_ELEMENT_DELTA",
"DOCUMENT_ELEMENT_SETPREF",
+ "WAVELET_ADD_PARTICIPANT",
+ "WAVELET_REMOVE_PARTICIPANT",
]
@Class
@@ -377,9 +377,7 @@ def transform(self, input_op):
self.fireEvent("operationChanged", i)
else: # end >= myop.index + myop.length()
op.resize(op.length() - myop.length())
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
break
else: # op.index >= myop.index
@@ -391,9 +389,7 @@ def transform(self, input_op):
op_lst.pop(j)
j -= 1
if myop.isNull():
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
break
else:
@@ -430,9 +426,7 @@ def transform(self, input_op):
myop.resize(op.index - myop.index)
self.fireEvent("operationChanged", i)
new_op.resize(new_op.length() - myop.length())
- self.fireEvent("beforeOperationsInserted", [i+1, i+1])
- self.operations.insert(i+1, new_op)
- self.fireEvent("afterOperationsInserted", [i+1, i+1])
+ self.insertOperation(i+1, new_op)
op.index = myop.index
elif op.isInsert() and myop.isInsert():
@@ -462,6 +456,12 @@ def transform(self, input_op):
if op.index <= myop.index:
myop.index += op.length()
self.fireEvent("operationChanged", i)
+ elif (op.type == WAVELET_ADD_PARTICIPANT and myop.type == WAVELET_ADD_PARTICIPANT) \
+ or (op.type == WAVELET_REMOVE_PARTICIPANT and myop.type == WAVELET_REMOVE_PARTICIPANT):
+ if op.property == myop.property:
+ self.removeOperation(i)
+ i -= 1
+ break
j += 1
@@ -571,9 +571,7 @@ def __insert(self, newop):
op.deleteString(newop.index - op.index, remain)
newop.resize(newop.length() - remain)
if op.isNull():
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
else:
self.fireEvent("operationChanged", i)
@@ -582,9 +580,7 @@ def __insert(self, newop):
elif newop.index < op.index and newop.index+newop.length() > op.index:
if newop.index+newop.length() >= op.index+op.length():
newop.resize(newop.length() - op.length())
- self.fireEvent("beforeOperationsRemoved", [i, i])
- self.operations.pop(i)
- self.fireEvent("afterOperationsRemoved", [i, i])
+ self.removeOperation(i)
i -= 1
else:
dlength = newop.index+newop.length() - op.index
@@ -601,11 +597,42 @@ def __insert(self, newop):
op.resize(op.length() + newop.length())
self.fireEvent("operationChanged", i)
return
+ elif (newop.type == WAVELET_ADD_PARTICIPANT and op.type == WAVELET_ADD_PARTICIPANT) \
+ or (newop.type == WAVELET_REMOVE_PARTICIPANT and op.type == WAVELET_REMOVE_PARTICIPANT):
+ if newop.property == op.property:
+ return
# If we reach this the operation could not be merged, so add it.
- self.fireEvent("beforeOperationsInserted", [i+1, i+1])
- self.operations.append(newop)
- self.fireEvent("afterOperationsInserted", [i+1, i+1])
+ self.insertOperation(i+1, newop)
+
+ def insertOperation(self, index, op):
+ """
+ Inserts an operation at the specified index.
+ Fires signals appropriately.
+
+ @function {public} insertOperation
+ @param {int} index Position in operations list
+ @param {Operation} op Operation object to insert
+ """
+ if index > len(self.operations) or index < 0:
+ return
+ self.fireEvent("beforeOperationsInserted", [index, index])
+ self.operations.insert(index, op)
+ self.fireEvent("afterOperationsInserted", [index, index])
+
+ def removeOperation(self, index):
+ """
+ Removes an operation at the specified index.
+ Fires signals appropriately.
+
+ @function {public} removeOperation
+ @param {int} index Position in operations list
+ """
+ if index < 0 or index >= len(self.operations):
+ return
+ self.fireEvent("beforeOperationsRemoved", [index, index])
+ self.operations.pop(index)
+ self.fireEvent("afterOperationsRemoved", [index, index])
# --------------------------------------------------------------------------
@@ -711,3 +738,31 @@ def documentElementSetpref(self, blipId, index, key, value):
"value": value
}
))
+
+ def waveletAddParticipant(self, id):
+ """
+ Requests to add a Participant to the wavelet.
+
+ @function {public} waveletAddParticipant
+ @param {String} id ID of the Participant to add
+ """
+ self.__insert(Operation(
+ WAVELET_ADD_PARTICIPANT,
+ self.waveId, self.waveletId, "",
+ -1,
+ id
+ ))
+
+ def waveletRemoveParticipant(self, id):
+ """
+ Requests to remove a Participant to the wavelet.
+
+ @function {public} waveletRemoveParticipant
+ @param {String} id ID of the Participant to remove
+ """
+ self.__insert(Operation(
+ WAVELET_REMOVE_PARTICIPANT,
+ self.waveId, self.waveletId, "",
+ -1,
+ id
+ ))
View
5 pygowave_server/middleware.py
@@ -17,9 +17,6 @@ def process_request(self, request):
try:
profile_obj = Participant.objects.get(user__id=request.user.id)
except ObjectDoesNotExist:
- profile_obj = Participant()
- profile_obj.id = "%s@%s" % (request.user.username, settings.WAVE_DOMAIN)
- profile_obj.name = request.user.username
- profile_obj.user = request.user
+ profile_obj = Participant.objects.create_from_user(request.user)
profile_obj.last_contact = datetime.now()
profile_obj.save()
View
46 pygowave_server/models.py
@@ -28,9 +28,10 @@
from django.utils import simplejson
+import uuid
+
from pygowave_server.utils import find_random_id, gen_random_id, datetime2milliseconds
-from pygowave_server.common.operations import OpManager, DOCUMENT_DELETE, DOCUMENT_INSERT, \
- DOCUMENT_ELEMENT_INSERT, DOCUMENT_ELEMENT_DELETE, DOCUMENT_ELEMENT_DELTA, DOCUMENT_ELEMENT_SETPREF
+from pygowave_server.common.operations import *
__author__ = "patrick.p2k.schneider@gmail.com"
@@ -53,6 +54,13 @@ class ParticipantManager(models.Manager):
def online_count(self):
timeout = datetime.now() - timedelta(minutes=settings.ONLINE_TIMEOUT_MINUTES)
return self.filter(last_contact__gte=timeout).count()
+
+ def create_from_user(self, user):
+ return self.create(
+ id = "%s@%s" % (user.username, settings.WAVE_DOMAIN),
+ name = user.username,
+ user = user
+ )
class Participant(models.Model):
"""
@@ -124,12 +132,12 @@ def save(self, force_insert=False, force_update=False):
@classmethod
def find_random_keys(cls):
- rx_key = gen_random_id(10)
+ rx_key = str(uuid.uuid1())
while cls.objects.filter(rx_key=rx_key).count() > 0:
- rx_key = gen_random_id(10)
- tx_key = gen_random_id(10)
+ rx_key = str(uuid.uuid1()) # Should never happen, but just in case :)
+ tx_key = str(uuid.uuid1())
while cls.objects.filter(tx_key=tx_key).count() > 0:
- tx_key = gen_random_id(10)
+ tx_key = str(uuid.uuid1())
return rx_key, tx_key
def __unicode__(self):
@@ -176,6 +184,17 @@ def save(self, force_insert=False, force_update=False):
else:
super(Wave, self).save(force_insert, force_update)
+ def create_wavelet(self, creator, title):
+ wavelet = Wavelet(wave=self, creator=creator, title=title, is_root=False)
+ wavelet.save()
+ wavelet.participants.add(creator)
+
+ blip = Blip(wavelet=wavelet, creator=creator)
+ blip.save()
+ wavelet.root_blip = blip
+ wavelet.save()
+ return wavelet
+
def __unicode__(self):
return u"Wave %s" % (self.id)
@@ -242,6 +261,7 @@ def serialize(self):
"version": self.version,
"lastModifiedTime": datetime2milliseconds(self.last_modified),
"waveId": self.wave.id,
+ "isRoot": self.is_root,
}
def serialize_blips(self):
@@ -283,6 +303,20 @@ def applyOperations(self, ops):
except:
pass #TODO: error handling
blip.save()
+ else:
+ if op.type == WAVELET_ADD_PARTICIPANT:
+ # Find participant
+ try:
+ p = Participant.objects.get(id=op.property)
+ except ObjectDoesNotExist:
+ continue #TODO: error handling
+ # Check if already participating
+ if self.participants.filter(id=op.property).count() > 0:
+ continue #TODO: error handling
+ self.participants.add(p)
+ elif op.type == WAVELET_REMOVE_PARTICIPANT:
+ if self.participants.filter(id=op.property).count() > 0:
+ self.participants.remove(self.participants.get(id=op.property))
def blipsums(self):
"""
View
2  pygowave_server/utils.py
@@ -83,7 +83,7 @@ def get_profile_model():
def gen_random_id(length):
"""
Generate a random string with the given length.
- Characgters are taken from RANDOM_ID_BASE.
+ Characters are taken from RANDOM_ID_BASE.
"""
return "".join([random.choice(RANDOM_ID_BASE) for x in xrange(length)])
Please sign in to comment.
Something went wrong with that request. Please try again.