Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly display conversations on junior node after plugin reload #209

Merged
merged 12 commits into from
Dec 16, 2021
99 changes: 86 additions & 13 deletions src/java/org/jivesoftware/openfire/archive/Conversation.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@

package org.jivesoftware.openfire.archive;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.JiveID;
import org.jivesoftware.database.SequenceManager;
Expand All @@ -45,6 +34,30 @@
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;

import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
* Represents an IM conversation between people. A conversation encompasses a series of messages sent back and forth. It may cover a single topic
* or several. The start of a conversation occurs when the first message between users is sent. It ends when either:
Expand All @@ -61,6 +74,7 @@
*
* @author Matt Tucker
*/
@XmlRootElement
@JiveID(50)
public class Conversation implements Externalizable {

Expand All @@ -79,15 +93,23 @@ public class Conversation implements Externalizable {

private transient ConversationManager conversationManager;

@XmlElement
private long conversationID = -1;
@XmlElementWrapper
private Map<String, UserParticipations> participants;
@XmlElement
private boolean external;
@XmlElement
private Date startDate;
@XmlElement
private Date lastActivity;
@XmlElement
private int messageCount;
/**
* Room where the group conversion is taking place. For one-to-one chats there is no room so this variable will be null.
*/
@XmlElement
@XmlJavaTypeAdapter(XmlSerializer.JidAdapter.class)
private JID room;

/**
Expand Down Expand Up @@ -587,9 +609,48 @@ void conversationEnded(Date nowDate) {
}
}

/**
* Convert the conversation to an XML representation.
*
* @return XML representation of the conversation.
* @throws IOException On any issue that occurs when marshalling this instance to XML.
*/
public String toXml() throws IOException {
return ConversationManager.getXmlSerializer().marshall(this);
}

/**
* Create a new conversation object based on the XML representation.
*
* @param xmlString The XML representation.
* @return A newly instantiated conversation object containing state as included in the XML representation.
* @throws IOException On any issue that occurs when unmarshalling XML to an instance of Conversation.
*/
public static Conversation fromXml(final String xmlString) throws IOException {
final Conversation unmarshalled = (Conversation) ConversationManager.getXmlSerializer().unmarshall(xmlString);
final Optional<Plugin> plugin = XMPPServer.getInstance().getPluginManager().getPluginByName(MonitoringConstants.PLUGIN_NAME);
if (!plugin.isPresent()) {
// Highly unlikely, as this code is _part_ of the Monitoring plugin. If this occurs something is very wrong.
throw new IOException("Unable to deserialize data as a Conversations instance: " + xmlString,
new IllegalStateException("The Monitoring plugin does not appear to be loaded on this machine."));
}
if (unmarshalled != null) {
unmarshalled.conversationManager = ((MonitoringPlugin) plugin.get()).getConversationManager();
}
return unmarshalled;
}

public void writeExternal(ObjectOutput out) throws IOException {
// ClassCastExceptions occur when using classes provided by a plugin during serialization (sometimes only after
// reloading the plugin without restarting Openfire. This is why this implementation marshalls data as XML when
// serializing. See https://github.com/igniterealtime/openfire-monitoring-plugin/issues/120
// and https://github.com/igniterealtime/openfire-monitoring-plugin/issues/156
ExternalizableUtil.getInstance().writeLong(out, conversationID);
ExternalizableUtil.getInstance().writeExternalizableMap(out, participants);
ExternalizableUtil.getInstance().writeInt(out, participants.size());
for (Map.Entry<String, UserParticipations> e : participants.entrySet()) {
ExternalizableUtil.getInstance().writeSafeUTF(out, e.getKey());
ExternalizableUtil.getInstance().writeSafeUTF(out, ConversationManager.getXmlSerializer().marshall(e.getValue()));
}
ExternalizableUtil.getInstance().writeBoolean(out, external);
ExternalizableUtil.getInstance().writeLong(out, startDate.getTime());
ExternalizableUtil.getInstance().writeLong(out, lastActivity.getTime());
Expand All @@ -601,6 +662,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
}

public void readExternal(ObjectInput in) throws IOException {
// ClassCastExceptions occur when using classes provided by a plugin during serialization (sometimes only after
// reloading the plugin without restarting Openfire. This is why this implementation marshalls data as XML when
// serializing. See https://github.com/igniterealtime/openfire-monitoring-plugin/issues/120
// and https://github.com/igniterealtime/openfire-monitoring-plugin/issues/156
final Optional<Plugin> plugin = XMPPServer.getInstance().getPluginManager().getPluginByName(MonitoringConstants.PLUGIN_NAME);
if (!plugin.isPresent()) {
throw new IllegalStateException("Unable to handle IQ stanza. The Monitoring plugin does not appear to be loaded on this machine.");
Expand All @@ -610,7 +675,15 @@ public void readExternal(ObjectInput in) throws IOException {
this.participants = new ConcurrentHashMap<>();

conversationID = ExternalizableUtil.getInstance().readLong(in);
ExternalizableUtil.getInstance().readExternalizableMap(in, participants, getClass().getClassLoader());

int participantsCount = ExternalizableUtil.getInstance().readInt(in);
for (int i = 0; i < participantsCount; i++) {
String participantName = ExternalizableUtil.getInstance().readSafeUTF(in);
String marshalledParticipations = ExternalizableUtil.getInstance().readSafeUTF(in);
final UserParticipations unmarshalledParticipations = (UserParticipations)ConversationManager.getXmlSerializer().unmarshall(marshalledParticipations);
participants.put(participantName, unmarshalledParticipations);
}

external = ExternalizableUtil.getInstance().readBoolean(in);
startDate = new Date(ExternalizableUtil.getInstance().readLong(in));
lastActivity = new Date(ExternalizableUtil.getInstance().readLong(in));
Expand Down
90 changes: 69 additions & 21 deletions src/java/org/jivesoftware/openfire/archive/ConversationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package org.jivesoftware.openfire.archive;

import com.reucon.openfire.plugin.archive.util.StanzaIDUtil;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.database.SequenceManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerInfo;
import org.jivesoftware.openfire.archive.cluster.GetConversationCountTask;
Expand All @@ -34,25 +34,41 @@
import org.jivesoftware.openfire.muc.MultiUserChatService;
import org.jivesoftware.openfire.plugin.MonitoringPlugin;
import org.jivesoftware.openfire.reporting.util.TaskEngine;
import com.reucon.openfire.plugin.archive.util.StanzaIDUtil;
import org.jivesoftware.openfire.stats.Statistic;
import org.jivesoftware.openfire.stats.StatisticsManager;
import org.jivesoftware.util.*;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.NotFoundException;
import org.jivesoftware.util.PropertyEventDispatcher;
import org.jivesoftware.util.PropertyEventListener;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -93,7 +109,10 @@ public class ConversationManager implements ComponentEventListener{
private ConversationEventsQueue conversationEventsQueue;
private TaskEngine taskEngine;

private Map<String, Conversation> conversations = new ConcurrentHashMap<String, Conversation>();
private static XmlSerializer xmlSerializer;


private Map<String, Conversation> conversations = new ConcurrentHashMap<>();
private boolean metadataArchivingEnabled;
/**
* Flag that indicates if messages of one-to-one chats should be archived.
Expand Down Expand Up @@ -131,7 +150,7 @@ public class ConversationManager implements ComponentEventListener{

public ConversationManager(TaskEngine taskEngine) {
this.taskEngine = taskEngine;
this.gateways = new CopyOnWriteArrayList<String>();
this.gateways = new CopyOnWriteArrayList<>();
this.serverInfo = XMPPServer.getInstance().getServerInfo();
this.conversationEventsQueue = new ConversationEventsQueue(this, taskEngine);
}
Expand Down Expand Up @@ -160,7 +179,7 @@ public void start() {
propertyListener = new ConversationPropertyListener();
PropertyEventDispatcher.addListener(propertyListener);

conversationListeners = new CopyOnWriteArraySet<ConversationListener>();
conversationListeners = new CopyOnWriteArraySet<>();

conversationArchiver = new ConversationArchivingRunnable( "MonitoringPlugin Conversations" );
messageArchiver = new MessageArchivingRunnable( "MonitoringPlugin Messages" );
Expand Down Expand Up @@ -499,7 +518,7 @@ public int getConversationCount() {
if (ClusterManager.isSeniorClusterMember()) {
return conversations.size();
}
return (Integer) CacheFactory.doSynchronousClusterTask(new GetConversationCountTask(), ClusterManager.getSeniorClusterMember().toByteArray());
return CacheFactory.doSynchronousClusterTask(new GetConversationCountTask(), ClusterManager.getSeniorClusterMember().toByteArray());
}

/**
Expand All @@ -523,12 +542,17 @@ public Conversation getConversation(long conversationID) throws NotFoundExceptio
return new Conversation(this, conversationID);
} else {
// Get this info from the senior cluster member when running in a cluster
Conversation conversation = (Conversation) CacheFactory.doSynchronousClusterTask(new GetConversationTask(conversationID), ClusterManager
String conversationXml = CacheFactory.doSynchronousClusterTask(new GetConversationTask(conversationID), ClusterManager
.getSeniorClusterMember().toByteArray());
if (conversation == null) {
if (conversationXml == null) {
throw new NotFoundException("Conversation not found: " + conversationID);
}
try {
return Conversation.fromXml(conversationXml);
} catch (IOException e) {
Log.warn("Conversation {} could not be reconstructed from '{}' because of '{}'. Handling this as if the conversation was not found.", conversationID, conversationXml, e.getMessage());
throw new NotFoundException("Conversation not found: " + conversationID);
}
return conversation;
}
}

Expand All @@ -539,18 +563,26 @@ public Conversation getConversation(long conversationID) throws NotFoundExceptio
*/
public Collection<Conversation> getConversations() {
if (ClusterManager.isSeniorClusterMember()) {
List<Conversation> conversationList = new ArrayList<Conversation>(conversations.values());
List<Conversation> conversationList = new ArrayList<>(conversations.values());
// Sort the conversations by creation date.
Collections.sort(conversationList, new Comparator<Conversation>() {
public int compare(Conversation c1, Conversation c2) {
return c1.getStartDate().compareTo(c2.getStartDate());
}
});
conversationList.sort(Comparator.comparing(Conversation::getStartDate));
return conversationList;
} else {
// Get this info from the senior cluster member when running in a cluster
return (Collection<Conversation>) CacheFactory.doSynchronousClusterTask(new GetConversationsTask(), ClusterManager
Collection<String> conversationXmls = CacheFactory.doSynchronousClusterTask(new GetConversationsTask(), ClusterManager
.getSeniorClusterMember().toByteArray());
Collection<Conversation> result = new ArrayList<>();
for (String conversationXml : conversationXmls) {
try {
Log.debug("Interpreting conversation from: {}", conversationXml);
final Conversation conversation = Conversation.fromXml(conversationXml);
Log.debug("Interpreted conversation: {}", conversation);
result.add(conversation);
} catch (IOException e) {
Log.warn("Conversation could not be reconstructed from '{}' because of '{}'. This conversation is not included in the result set.", conversationXml, e.getMessage());
}
}
return result;
}
}

Expand Down Expand Up @@ -647,7 +679,7 @@ void processMessage(JID sender, JID receiver, String body, String stanza, Date d
Conversation conversation = conversations.get(conversationKey);
// Create a new conversation if necessary.
if (conversation == null) {
Collection<JID> participants = new ArrayList<JID>(2);
Collection<JID> participants = new ArrayList<>(2);
participants.add(sender);
participants.add(receiver);
XMPPServer server = XMPPServer.getInstance();
Expand All @@ -670,7 +702,7 @@ else if ((date.getTime() - conversation.getLastActivity().getTime() > idleTime)
|| (date.getTime() - conversation.getStartDate().getTime() > maxTime)) {
removeConversation(conversationKey, conversation, conversation.getLastActivity());

Collection<JID> participants = new ArrayList<JID>(2);
Collection<JID> participants = new ArrayList<>(2);
participants.add(sender);
participants.add(receiver);
XMPPServer server = XMPPServer.getInstance();
Expand Down Expand Up @@ -1080,6 +1112,22 @@ public Duration availabilityETAOnLocalNode( Instant instant )
.orElse( Duration.ZERO );
}

/**
* Returns an XML serializer that can be used to marshall and unmarshall conversation objects.
* @return The XML serializer.
*/
public static synchronized XmlSerializer getXmlSerializer() {
if (ConversationManager.xmlSerializer == null) {
ConversationManager.xmlSerializer = new XmlSerializer(
Conversation.class,
UserParticipations.class,
ConversationParticipation.class,
ConversationEvent.class
);
}
return ConversationManager.xmlSerializer;
}

/**
* Stores Conversations in the database.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.jivesoftware.util.cache.ExternalizableUtil;

import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
Expand All @@ -31,9 +33,13 @@
*
* @author Gaston Dombiak
*/
@XmlRootElement
public class ConversationParticipation implements Externalizable {
@XmlElement
private Date joined = new Date();
@XmlElement
private Date left;
@XmlElement
private String nickname;

public ConversationParticipation() {
Expand Down
Loading