Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public ResultSet<NodeAffiliation> getNodeAffiliations(String nodeId,
maxItemsToReturn);
}

@Override
public ArrayList<JID> getNodeOwners(String node) throws NodeStoreException {
return nodeStore.getNodeOwners(node);
}

@Override
public int countNodeAffiliations(String nodeId) throws NodeStoreException {
return nodeStore.countNodeAffiliations(nodeId);
Expand Down Expand Up @@ -468,6 +473,5 @@ public ResultSet<NodeThread> getNodeThreads(String node, String afterId,
@Override
public int countNodeThreads(String node) throws NodeStoreException {
return nodeStore.countNodeThreads(node);
}

}
}
49 changes: 49 additions & 0 deletions src/main/java/org/buddycloud/channelserver/db/NodeStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ ResultSet<NodeAffiliation> getNodeAffiliations(String nodeId)
ResultSet<NodeAffiliation> getNodeAffiliations(String node
, String afterItemId, int maxItemsToReturn) throws NodeStoreException;

/**
* Get a list of node owners
*
* @param node
* @throws NodeStoreException
*/
ArrayList<JID> getNodeOwners(String node) throws NodeStoreException;

/**
* Count the number of affiliations for a node
*
Expand Down Expand Up @@ -573,16 +581,57 @@ void deleteNodeItemById(String nodeId, String nodeItemId)
*/
int getFirehoseItemCount(boolean isAdmin) throws NodeStoreException;

/**
* Get a list of posts for a user
*
* @param userJid
* @return
* @throws NodeStoreException
*/
ResultSet<NodeItem> getUserItems(JID userJid) throws NodeStoreException;

/**
* Delete user posts
*
* @param userJid
* @throws NodeStoreException
*/
void deleteUserItems(JID userJid) throws NodeStoreException;

/**
* Delete affiliations for a user
*
* @param userJid
* @throws NodeStoreException
*/
void deleteUserAffiliations(JID userJid) throws NodeStoreException;

/**
* Delete user subscriptions
*
* @param userJid
* @throws NodeStoreException
*/
void deleteUserSubscriptions(JID userJid) throws NodeStoreException;

/**
* Get node threads
*
* @param node
* @param afterId
* @param limit
* @return
* @throws NodeStoreException
*/
ResultSet<NodeThread> getNodeThreads(String node, String afterId, int limit) throws NodeStoreException;

/**
* Count node threads
*
* @param node
* @return
* @throws NodeStoreException
*/
int countNodeThreads(String node) throws NodeStoreException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,30 @@ public ResultSet<NodeAffiliation> getNodeAffiliations(String nodeId,
}
}

@Override
public ArrayList<JID> getNodeOwners(String node) throws NodeStoreException {
PreparedStatement stmt = null;

try {
stmt = conn.prepareStatement(dialect
.selectNodeOwners());
stmt.setString(1, node);

java.sql.ResultSet rs = stmt.executeQuery();
ArrayList<JID> result = new ArrayList<JID>();

while (rs.next()) {
result.add(new JID(rs.getString(1)));
}

return result;
} catch (SQLException e) {
throw new NodeStoreException(e);
} finally {
close(stmt); // Will implicitly close the resultset if required
}
}

@Override
public int countUserAffiliations(JID actorJid) throws NodeStoreException {
PreparedStatement selectStatement = null;
Expand Down Expand Up @@ -1840,6 +1864,8 @@ public void close() throws NodeStoreException {
public interface NodeStoreSQLDialect {
String insertNode();

String selectNodeOwners();

String getUserItems();

String selectItemsForLocalNodesBeforeDate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class Sql92NodeStoreDialect implements NodeStoreSQLDialect {
private static final String SELECT_AFFILIATIONS_FOR_USER = "SELECT \"node\", \"user\", \"affiliation\", \"updated\""
+ " FROM \"affiliations\" WHERE \"user\" = ? ORDER BY \"updated\" ASC";

private static final String SELECT_NODE_OWNERS = "SELECT \"user\" FROM \"affiliations\" WHERE \"node\" = ? AND \"affiliation\" = 'owner';";

private static final String SELECT_AFFILIATION_CHANGES = ""
+ "SELECT \"node\", \"user\", \"affiliation\", \"updated\" FROM \"affiliations\" "
+ "WHERE \"updated\" >= ? AND \"updated\" <= ? AND \"node\" IN "
Expand Down Expand Up @@ -278,6 +280,11 @@ public String selectAffiliation() {
public String selectAffiliationsForUser() {
return SELECT_AFFILIATIONS_FOR_USER;
}

@Override
public String selectNodeOwners() {
return SELECT_NODE_OWNERS;
}

@Override
public String selectAffiliationChanges() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class UnsubscribeSet extends PubSubElementProcessorAbstract {

private String node;
private IQ request;
private IQ response;
private JID unsubscribingJid;

public UnsubscribeSet(BlockingQueue<Packet> outQueue,
Expand All @@ -48,6 +49,7 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)

node = elm.attributeValue("node");
request = reqIQ;
response = IQ.createResultIQ(request);

if ((node == null) || (node.equals(""))) {
missingNodeName();
Expand Down Expand Up @@ -79,13 +81,12 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)


if (false == channelManager.nodeExists(node)) {
IQ reply = IQ.createResultIQ(request);
reply.setType(Type.error);
response.setType(Type.error);
PacketError pe = new PacketError(
org.xmpp.packet.PacketError.Condition.item_not_found,
org.xmpp.packet.PacketError.Type.cancel);
reply.setError(pe);
outQueue.put(reply);
response.setError(pe);
outQueue.put(response);
return;
}

Expand All @@ -98,13 +99,24 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
// Check that the requesting user is allowed to unsubscribe according to
// XEP-0060 section 6.2.3.3
if (false == unsubscribingJid.equals(existingSubscription.getUser())) {
IQ reply = IQ.createResultIQ(request);
reply.setType(Type.error);
response.setType(Type.error);
PacketError pe = new PacketError(
org.xmpp.packet.PacketError.Condition.forbidden,
org.xmpp.packet.PacketError.Type.auth);
reply.setError(pe);
outQueue.put(reply);
response.setError(pe);
outQueue.put(response);
return;
}

if ((Affiliations.owner == existingAffiliation.getAffiliation()) &&
(channelManager.getNodeOwners(node).size() < 2)) {

response.setType(Type.error);
PacketError pe = new PacketError(
org.xmpp.packet.PacketError.Condition.not_allowed,
org.xmpp.packet.PacketError.Type.cancel);
response.setError(pe);
outQueue.put(response);
return;
}

Expand All @@ -120,8 +132,7 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
Affiliations.none);
}

IQ reply = IQ.createResultIQ(request);
outQueue.put(reply);
outQueue.put(response);
notifySubscribers();
}

Expand Down Expand Up @@ -174,13 +185,12 @@ private void failAuthRequired() throws InterruptedException {
* <registration-required xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
* </error> </iq>
*/
IQ reply = IQ.createResultIQ(request);
reply.setType(Type.error);
response.setType(Type.error);
PacketError pe = new PacketError(
org.xmpp.packet.PacketError.Condition.not_authorized,
org.xmpp.packet.PacketError.Type.auth);
reply.setError(pe);
outQueue.put(reply);
response.setError(pe);
outQueue.put(response);
}

private void missingNodeName() throws InterruptedException {
Expand All @@ -194,8 +204,7 @@ private void missingNodeName() throws InterruptedException {
* </error> </iq>
*/

IQ reply = IQ.createResultIQ(request);
reply.setType(Type.error);
response.setType(Type.error);

Element badRequest = new DOMElement("bad-request",
new org.dom4j.Namespace("", JabberPubsub.NS_XMPP_STANZAS));
Expand All @@ -208,9 +217,9 @@ private void missingNodeName() throws InterruptedException {
error.add(badRequest);
error.add(nodeIdRequired);

reply.setChildElement(error);
response.setChildElement(error);

outQueue.put(reply);
outQueue.put(response);
}

private void makeRemoteRequest() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class JDBCNodeStoreTest {
private static final String TEST_SERVER1_HOSTNAME = "server1";
private static final String TEST_SERVER2_HOSTNAME = "server2";

private static final String UNKNOWN_NODE = "/user/unknown@example.com/posts";

private static final HashMap<String, String> TEST_SERVER1_NODE1_CONF = new HashMap<String, String>() {
{
put("config1", "Value of config1");
Expand Down Expand Up @@ -1322,16 +1324,18 @@ public void testGetNodeItemsWithPaging() throws Exception {
dbTester.loadData("node_1");

long start = System.currentTimeMillis();

NodeItem[] items = new NodeItem[20];

for (int i = 0; i < 20; i++) {
items[i] = new NodeItemImpl(TEST_SERVER1_NODE1_ID, String
.valueOf(i), new Date(start + i * 10), "payload" + String.valueOf(i));
items[i] = new NodeItemImpl(TEST_SERVER1_NODE1_ID,
String.valueOf(i), new Date(start + i * 10), "payload"
+ String.valueOf(i));
store.addNodeItem(items[i]);
}

CloseableIterator<NodeItem> result = store.getNodeItems(TEST_SERVER1_NODE1_ID, "15", 3);
CloseableIterator<NodeItem> result = store.getNodeItems(
TEST_SERVER1_NODE1_ID, "15", 3);

assertEquals("Incorrect node item returned", items[14], result.next());
assertEquals("Incorrect node item returned", items[13], result.next());
Expand Down Expand Up @@ -1565,7 +1569,7 @@ public void testGetRecentItemsCanBePaged() throws Exception {
store.addUserSubscription(new NodeSubscriptionImpl(
TEST_SERVER1_NODE2_ID, TEST_SERVER1_USER1_JID,
Subscriptions.subscribed));

long now = System.currentTimeMillis();

NodeItem nodeItem1 = new NodeItemImpl(TEST_SERVER1_NODE1_ID, "123",
Expand All @@ -1585,7 +1589,9 @@ public void testGetRecentItemsCanBePaged() throws Exception {
store.addNodeItem(nodeItem4);

CloseableIterator<NodeItem> items = store.getRecentItems(
TEST_SERVER1_USER1_JID, since, -1, 2, new GlobalItemIDImpl(TEST_SERVER1_CHANNELS_JID, TEST_SERVER1_NODE1_ID, "124"), null);
TEST_SERVER1_USER1_JID, since, -1, 2,
new GlobalItemIDImpl(TEST_SERVER1_CHANNELS_JID,
TEST_SERVER1_NODE1_ID, "124"), null);

assertSameNodeItem(items.next(), nodeItem3);
assertSameNodeItem(items.next(), nodeItem1);
Expand Down Expand Up @@ -1646,7 +1652,7 @@ public void testCanPageGetRecentItemsUsingResultSetManagement()
dbTester.loadData("node_1");

Date since = new Date();

Thread.sleep(10);

store.addRemoteNode(TEST_SERVER1_NODE2_ID);
Expand All @@ -1659,8 +1665,9 @@ public void testCanPageGetRecentItemsUsingResultSetManagement()
store.addNodeItem(new NodeItemImpl(TEST_SERVER1_NODE1_ID, String
.valueOf(i), new Date(), "payload" + String.valueOf(i)));
}

GlobalItemID itemID = new GlobalItemIDImpl(TEST_SERVER1_CHANNELS_JID, TEST_SERVER1_NODE1_ID, "15");

GlobalItemID itemID = new GlobalItemIDImpl(TEST_SERVER1_CHANNELS_JID,
TEST_SERVER1_NODE1_ID, "15");

CloseableIterator<NodeItem> items = store.getRecentItems(
TEST_SERVER1_USER1_JID, since, -1, 10, itemID, null);
Expand Down Expand Up @@ -1785,8 +1792,8 @@ public void testCanGetItemThread() throws Exception {
@Test
public void testCanGetItemThreadWithResultSetManagement() throws Exception {
dbTester.loadData("node_1");
NodeItem testItemParent = new NodeItemImpl(TEST_SERVER1_NODE1_ID, "a100",
new Date(100), "<entry>payload parent</entry>");
NodeItem testItemParent = new NodeItemImpl(TEST_SERVER1_NODE1_ID,
"a100", new Date(100), "<entry>payload parent</entry>");
NodeItem testItem1 = new NodeItemImpl(TEST_SERVER1_NODE1_ID, "a6",
new Date(20), "<entry>payload</entry>", "a100");
NodeItem testItem2 = new NodeItemImpl(TEST_SERVER1_NODE1_ID, "a7",
Expand Down Expand Up @@ -2319,16 +2326,35 @@ private void assertSameNodeItem(NodeItem actual, NodeItem expected) {
assertEquals(expected.getPayload(), actual.getPayload());
assertEquals(expected.getInReplyTo(), actual.getInReplyTo());
}

@Test
public void testSelectNodeThreads() throws Exception {
dbTester.loadData("node_1");
assertEquals(5, store.getNodeThreads(TEST_SERVER1_NODE1_ID, null, 10).size());
assertEquals(5, store.getNodeThreads(TEST_SERVER1_NODE1_ID, null, 10)
.size());
}

@Test
public void testCountNodeThreads() throws Exception {
dbTester.loadData("node_1");
assertEquals(5, store.countNodeThreads(TEST_SERVER1_NODE1_ID));
}

@Test
public void testNoNodeOwnersReturnsEmptyList() throws Exception {
dbTester.loadData("node_1");
assertEquals(0, store.getNodeOwners(UNKNOWN_NODE).size());
}

@Test
public void testNodeOwnersReturnsExpectedList() throws Exception {
dbTester.loadData("node_1");

store.addUserSubscription(new NodeSubscriptionImpl(TEST_SERVER1_NODE1_ID, TEST_SERVER1_USER2_JID, Subscriptions.subscribed));
store.setUserAffiliation(TEST_SERVER1_NODE1_ID, TEST_SERVER1_USER2_JID, Affiliations.owner);

assertEquals(2, store.getNodeOwners(TEST_SERVER1_NODE1_ID).size());
assertEquals(TEST_SERVER1_USER1_JID, store.getNodeOwners(TEST_SERVER1_NODE1_ID).get(0));
assertEquals(TEST_SERVER1_USER2_JID, store.getNodeOwners(TEST_SERVER1_NODE1_ID).get(1));
}
}
Loading