Skip to content

Commit

Permalink
For igniterealtime#120 / igniterealtime#170: Fix XML marshalling of C…
Browse files Browse the repository at this point in the history
…onversationEvent

If ConversationEvent can't be marshalled to/from XML, then events won't ever be pushed successfully from junior cluster nodes to the senior node.
  • Loading branch information
guusdk committed Dec 21, 2021
1 parent d0700be commit 15d57b5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 71 deletions.
102 changes: 34 additions & 68 deletions src/java/org/jivesoftware/openfire/archive/ConversationEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,46 @@

import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import javax.xml.bind.annotation.*;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Date;
import java.util.Objects;

/**
* Conversation events are only used when running in a cluster as a way to send to the senior cluster
* member information about a conversation that is taking place in this cluster node.
*
* @author Gaston Dombiak
*/
public class ConversationEvent implements Externalizable {
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class ConversationEvent {
private static final Logger Log = LoggerFactory.getLogger(ConversationEvent.class);

private Type type;

private Date date;

private String body;

private String stanza;

@XmlJavaTypeAdapter(XmlSerializer.JidAdapter.class)
private JID sender;

@XmlJavaTypeAdapter(XmlSerializer.JidAdapter.class)
private JID receiver;

@XmlJavaTypeAdapter(XmlSerializer.JidAdapter.class)
private JID roomJID;

@XmlJavaTypeAdapter(XmlSerializer.JidAdapter.class)
private JID user;

private String nickname;

/**
Expand All @@ -53,6 +67,7 @@ public ConversationEvent() {
}

public void run(ConversationManager conversationManager) {
Log.debug("Processing {} chat event dated {}", type, date);
if (Type.chatMessageReceived == type) {
conversationManager.processMessage(sender, receiver, body, stanza, date);
}
Expand All @@ -79,68 +94,6 @@ else if (Type.roomMessageReceived == type) {
}
}

public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeInt(out, type.ordinal());
ExternalizableUtil.getInstance().writeLong(out, date.getTime());

ExternalizableUtil.getInstance().writeBoolean(out, sender != null);
if (sender != null) {
ExternalizableUtil.getInstance().writeSerializable(out, sender);
}
ExternalizableUtil.getInstance().writeBoolean(out, receiver != null);
if (receiver != null) {
ExternalizableUtil.getInstance().writeSerializable(out, receiver);
}
ExternalizableUtil.getInstance().writeBoolean(out, body != null);
if (body != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, body);
}
ExternalizableUtil.getInstance().writeBoolean(out, stanza != null);
if (stanza != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, stanza);
}
ExternalizableUtil.getInstance().writeBoolean(out, roomJID != null);
if (roomJID != null) {
ExternalizableUtil.getInstance().writeSerializable(out, roomJID);
}
ExternalizableUtil.getInstance().writeBoolean(out, user != null);
if (user != null) {
ExternalizableUtil.getInstance().writeSerializable(out, user);
}
ExternalizableUtil.getInstance().writeBoolean(out, nickname != null);
if (nickname != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, nickname);
}
}

public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
type = Type.values()[ExternalizableUtil.getInstance().readInt(in)];
date = new Date(ExternalizableUtil.getInstance().readLong(in));

if (ExternalizableUtil.getInstance().readBoolean(in)) {
sender = (JID) ExternalizableUtil.getInstance().readSerializable(in);
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
receiver = (JID) ExternalizableUtil.getInstance().readSerializable(in);
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
body = ExternalizableUtil.getInstance().readSafeUTF(in);
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
stanza = ExternalizableUtil.getInstance().readSafeUTF(in);
}

if (ExternalizableUtil.getInstance().readBoolean(in)) {
roomJID = (JID) ExternalizableUtil.getInstance().readSerializable(in);
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
user = (JID) ExternalizableUtil.getInstance().readSerializable(in);
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
nickname = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}

public static ConversationEvent chatMessageReceived(JID sender, JID receiver, String body, String stanza, Date date) {
ConversationEvent event = new ConversationEvent();
event.type = Type.chatMessageReceived;
Expand Down Expand Up @@ -229,4 +182,17 @@ private enum Type {
*/
chatMessageReceived
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConversationEvent that = (ConversationEvent) o;
return type == that.type && Objects.equals(date, that.date) && Objects.equals(body, that.body) && Objects.equals(stanza, that.stanza) && Objects.equals(sender, that.sender) && Objects.equals(receiver, that.receiver) && Objects.equals(roomJID, that.roomJID) && Objects.equals(user, that.user) && Objects.equals(nickname, that.nickname);
}

@Override
public int hashCode() {
return Objects.hash(type, date, body, stanza, sender, receiver, roomJID, user, nickname);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import java.util.Map;
import java.util.TimerTask;

import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.archive.cluster.SendConversationEventsTask;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.reporting.util.TaskEngine;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Queue conversation events generated by this JVM and send them to the senior cluster
Expand All @@ -36,6 +39,7 @@
* @author Gaston Dombiak
*/
public class ConversationEventsQueue {
private static final Logger Log = LoggerFactory.getLogger(ConversationEventsQueue.class);
private final ConversationManager conversationManager;
/**
* Chat events that are pending to be sent to the senior cluster member.
Expand Down Expand Up @@ -82,9 +86,13 @@ public void run() {
}

// Send the queued events (from the temp place) to the senior cluster member
if (!eventsToSend.isEmpty()) {
CacheFactory.doClusterTask(new SendConversationEventsTask(eventsToSend),
ClusterManager.getSeniorClusterMember().toByteArray());
try {
if (!eventsToSend.isEmpty()) {
CacheFactory.doClusterTask(new SendConversationEventsTask(eventsToSend),
ClusterManager.getSeniorClusterMember().toByteArray());
}
} catch (Throwable t) {
Log.error("A problem occurred while trying to send events to the senior cluster node.", t);
}
}
};
Expand All @@ -98,6 +106,7 @@ public void run() {
* @param event conversation event.
*/
public void addChatEvent(String conversationKey, ConversationEvent event) {
Log.trace("Add chat event for key {}", conversationKey);
synchronized (chatEvents) {
List<ConversationEvent> events = chatEvents.get(conversationKey);
if (events == null) {
Expand All @@ -115,6 +124,7 @@ public void addChatEvent(String conversationKey, ConversationEvent event) {
* @param event conversation event.
*/
public void addGroupChatEvent(String conversationKey, ConversationEvent event) {
Log.trace("Add group chat event for key {}", conversationKey);
synchronized (roomEvents) {
List<ConversationEvent> events = roomEvents.get(conversationKey);
if (events == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.jivesoftware.openfire.muc.MUCEventListener;
import org.jivesoftware.openfire.muc.MUCRoom;
import org.jivesoftware.util.SystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;

Expand All @@ -35,6 +37,7 @@
*/
public class GroupConversationInterceptor implements MUCEventListener {

private static final Logger Log = LoggerFactory.getLogger(GroupConversationInterceptor.class);

private ConversationManager conversationManager;

Expand Down Expand Up @@ -136,9 +139,11 @@ public void messageReceived(JID roomJID, JID user, String nickname, Message mess

// Process this event in the senior cluster member or local JVM when not in a cluster
if (ClusterManager.isSeniorClusterMember()) {
Log.trace("Message received on senior node for room: {}", roomJID);
conversationManager.processRoomMessage(roomJID, user, null, nickname, message.getBody(), message.toXML(), now);
}
else {
Log.trace("Message received on junior node for room: {}", roomJID);
boolean withBody = conversationManager.isRoomArchivingEnabled() && (
conversationManager.getRoomsArchived().isEmpty() ||
conversationManager.getRoomsArchived().contains(roomJID.getNode()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public Void getResult() {
}

public void run() {
Log.debug("Processing {} chat events as received by other nodes.", events.size());
final Optional<Plugin> plugin = XMPPServer.getInstance().getPluginManager().getPluginByName(MonitoringConstants.PLUGIN_NAME);
if (!plugin.isPresent()) {
Log.error("Unable to execute cluster task! The Monitoring plugin does not appear to be loaded on this machine.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.jivesoftware.openfire.archive;

import org.junit.Test;
import org.xmpp.packet.JID;

import java.util.Date;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ConversationManagerTest {

@Test
public void testXmlMarshallingConversationEventTest() throws Exception {
// Setup test fixture.
final ConversationEvent input = ConversationEvent.chatMessageReceived(new JID("a@b.c"), new JID("d@e.f/g"), "body", "stanza", new Date(1));
final XmlSerializer serializer = new XmlSerializer(
Conversation.class,
UserParticipations.class,
ConversationParticipation.class,
ConversationEvent.class
);

// Execute system under test.
final String xml = serializer.marshall(input);
final Object result = serializer.unmarshall(xml);

// Verify result.
assertTrue(result instanceof ConversationEvent);
assertEquals("Marshalled content didn't unmarshall as equal object. Marshalled content: " + xml, input, result);
}
}

0 comments on commit 15d57b5

Please sign in to comment.