diff --git a/symmetric/pom.xml b/symmetric/pom.xml index 87b83e0c24..ce7b128f9f 100644 --- a/symmetric/pom.xml +++ b/symmetric/pom.xml @@ -428,5 +428,10 @@ mysql-connector-java 5.0.5 + + rome + rome + 0.9 + diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java index a07a649374..ef66d1d7ad 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java @@ -22,6 +22,7 @@ package org.jumpmind.symmetric.model; import java.io.Serializable; +import java.util.Date; import org.jumpmind.symmetric.load.IDataLoaderContext; @@ -38,6 +39,8 @@ public enum Status { private String nodeId; private Status status; + + private Date createTime; private boolean isRetry; @@ -86,4 +89,12 @@ public void setRetry(boolean isRetry) { this.isRetry = isRetry; } + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java index ab49c443fe..e60b9e619a 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Date; import java.util.List; public class OutgoingBatch implements Serializable { @@ -41,6 +42,8 @@ public enum Status { private Status status = Status.NE; private BatchType batchType = BatchType.EVENTS; + + private Date createTime; public OutgoingBatch() { } @@ -52,6 +55,9 @@ public OutgoingBatch(Node node, String channelId, BatchType batchType) { this.batchType = batchType; } + public String getNodeBatchId() { + return nodeId + "-" + batchId; + } public String getBatchId() { return batchId; @@ -113,4 +119,12 @@ public void setBatchType(String batchType) { } } + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java index 737da586e9..486aa5623c 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java @@ -31,6 +31,8 @@ public interface IIncomingBatchService { public IncomingBatch findIncomingBatch(String batchId, String nodeId); + public List findIncomingBatchErrors(int maxRows); + public List findIncomingBatchHistory(String batchId, String nodeId); @Transactional @@ -44,4 +46,5 @@ public interface IIncomingBatchService { @Transactional public int updateIncomingBatch(IncomingBatch status); + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index c5c354a547..e2e1616252 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -35,6 +35,8 @@ public interface IOutgoingBatchService { public List getOutgoingBatchRange(String startBatchId, String endBatchId); + public List getOutgoingBatcheErrors(int maxRows); + public void markOutgoingBatchSent(OutgoingBatch batch); public void setBatchStatus(String batchId, Status status); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index ad1189c9e4..67074cbb3f 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -30,6 +30,7 @@ import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.IncomingBatchHistory; import org.jumpmind.symmetric.service.IIncomingBatchService; +import org.jumpmind.symmetric.util.MaxRowsStatementCreator; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.RowMapper; @@ -40,6 +41,8 @@ public class IncomingBatchService extends AbstractService implements IIncomingBa private String findIncomingBatchSql; + private String findIncomingBatchErrorsSql; + private String findIncomingBatchHistorySql; private String insertIncomingBatchSql; @@ -59,6 +62,12 @@ public IncomingBatch findIncomingBatch(String batchId, String nodeId) { } } + @SuppressWarnings("unchecked") + public List findIncomingBatchErrors(int maxRows) { + return (List) jdbcTemplate.query(new MaxRowsStatementCreator( + findIncomingBatchErrorsSql, maxRows), new IncomingBatchMapper()); + } + @SuppressWarnings("unchecked") public List findIncomingBatchHistory(String batchId, String nodeId) { return (List) jdbcTemplate.query(findIncomingBatchHistorySql, new Object[] { batchId, @@ -104,6 +113,7 @@ public Object mapRow(ResultSet rs, int num) throws SQLException { batch.setBatchId(rs.getString(1)); batch.setNodeId(rs.getString(2)); batch.setStatus(IncomingBatch.Status.valueOf(rs.getString(3))); + batch.setCreateTime(rs.getTimestamp(4)); return batch; } } @@ -149,4 +159,8 @@ public void setSkipDuplicateBatches(boolean skipDuplicateBatchesEnabled) { this.skipDuplicateBatches = skipDuplicateBatchesEnabled; } + public void setFindIncomingBatchErrorsSql(String findIncomingBatchErrorsSql) { + this.findIncomingBatchErrorsSql = findIncomingBatchErrorsSql; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 4b0de6c85d..d7e4a16cbc 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -35,6 +35,7 @@ import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IOutgoingBatchHistoryService; import org.jumpmind.symmetric.service.IOutgoingBatchService; +import org.jumpmind.symmetric.util.MaxRowsStatementCreator; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; @@ -57,6 +58,8 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa private String selectOutgoingBatchRangeSql; + private String selectOutgoingBatchErrorsSql; + private String changeBatchStatusSql; private String initialLoadStatusSql; @@ -198,35 +201,21 @@ private void insertOutgoingBatch(Connection conn, OutgoingBatch outgoingBatch) t @SuppressWarnings("unchecked") public List getOutgoingBatches(String nodeId) { return (List) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql, new Object[] { nodeId, nodeId }, - new RowMapper() { - public Object mapRow(ResultSet rs, int index) throws SQLException { - OutgoingBatch batch = new OutgoingBatch(); - batch.setBatchId(rs.getString(1)); - batch.setNodeId(rs.getString(2)); - batch.setChannelId(rs.getString(3)); - batch.setStatus(rs.getString(4)); - batch.setBatchType(rs.getString(5)); - return batch; - } - }); + new OutgoingBatchMapper()); } @SuppressWarnings("unchecked") public List getOutgoingBatchRange(String startBatchId, String endBatchId) { return (List) outgoingBatchQueryTemplate.query(selectOutgoingBatchRangeSql, - new Object[] { startBatchId, endBatchId }, new RowMapper() { - public Object mapRow(ResultSet rs, int index) throws SQLException { - OutgoingBatch batch = new OutgoingBatch(); - batch.setBatchId(rs.getString(1)); - batch.setNodeId(rs.getString(2)); - batch.setChannelId(rs.getString(3)); - batch.setStatus(rs.getString(4)); - batch.setBatchType(rs.getString(5)); - return batch; - } - }); + new Object[] { startBatchId, endBatchId }, new OutgoingBatchMapper()); } + @SuppressWarnings("unchecked") + public List getOutgoingBatcheErrors(int maxRows) { + return (List) outgoingBatchQueryTemplate.query(new MaxRowsStatementCreator( + selectOutgoingBatchErrorsSql, maxRows), new OutgoingBatchMapper()); + } + public void markOutgoingBatchSent(OutgoingBatch batch) { setBatchStatus(batch.getBatchId(), Status.SE); } @@ -265,6 +254,19 @@ public boolean isInitialLoadComplete(String nodeId) { return returnValue; } + class OutgoingBatchMapper implements RowMapper { + public Object mapRow(ResultSet rs, int num) throws SQLException { + OutgoingBatch batch = new OutgoingBatch(); + batch.setBatchId(rs.getString(1)); + batch.setNodeId(rs.getString(2)); + batch.setChannelId(rs.getString(3)); + batch.setStatus(rs.getString(4)); + batch.setBatchType(rs.getString(5)); + batch.setCreateTime(rs.getTimestamp(6)); + return batch; + } + } + public void setConfigurationService(IConfigurationService configurationService) { this.configurationService = configurationService; } @@ -305,4 +307,8 @@ public void setSelectOutgoingBatchRangeSql(String selectOutgoingBatchRangeSql) { this.selectOutgoingBatchRangeSql = selectOutgoingBatchRangeSql; } + public void setSelectOutgoingBatchErrorsSql(String selectOutgoingBatchErrorsSql) { + this.selectOutgoingBatchErrorsSql = selectOutgoingBatchErrorsSql; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/util/MaxRowsStatementCreator.java b/symmetric/src/main/java/org/jumpmind/symmetric/util/MaxRowsStatementCreator.java new file mode 100644 index 0000000000..685aff2b54 --- /dev/null +++ b/symmetric/src/main/java/org/jumpmind/symmetric/util/MaxRowsStatementCreator.java @@ -0,0 +1,26 @@ +package org.jumpmind.symmetric.util; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.springframework.jdbc.core.PreparedStatementCreator; + +public class MaxRowsStatementCreator implements PreparedStatementCreator { + + private String sql; + + private int maxRows; + + public MaxRowsStatementCreator(String sql, int maxRows) { + this.sql = sql; + this.maxRows = maxRows; + } + + public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { + PreparedStatement st = conn.prepareStatement(sql); + st.setMaxRows(maxRows); + return st; + } + +} diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/AlertServlet.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/AlertServlet.java index cbec6b6355..dd4479263b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/AlertServlet.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/AlertServlet.java @@ -1,7 +1,8 @@ /* * SymmetricDS is an open source database synchronization solution. * - * Copyright (C) Chris Henson + * Copyright (C) Chris Henson , + * Eric Long * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -20,9 +21,111 @@ package org.jumpmind.symmetric.web; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.model.IncomingBatch; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.service.IIncomingBatchService; +import org.jumpmind.symmetric.service.IOutgoingBatchService; + +import com.sun.syndication.feed.synd.SyndContent; +import com.sun.syndication.feed.synd.SyndContentImpl; +import com.sun.syndication.feed.synd.SyndEntry; +import com.sun.syndication.feed.synd.SyndEntryImpl; +import com.sun.syndication.feed.synd.SyndFeed; +import com.sun.syndication.feed.synd.SyndFeedImpl; +import com.sun.syndication.io.FeedException; +import com.sun.syndication.io.SyndFeedOutput; + /** * This is a place holder for the idea that we could create an RSS feed for alerts. */ -public class AlertServlet { +public class AlertServlet extends AbstractServlet { + + private static final long serialVersionUID = 1L; + + private static final Log logger = LogFactory.getLog(AlertServlet.class); + + private static final int MAX_ERRORS = 1000; + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + @Override + public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + SyndFeed feed = new SyndFeedImpl(); + feed.setFeedType("rss_2.0"); + feed.setTitle("SymmetricDS Alerts"); + feed.setDescription("Problems synchronizing data"); + feed.setLink(req.getRequestURL().toString()); + + List entries = new ArrayList(); + + for (IncomingBatch batch : findIncomingBatchErrors()) { + String title = "Incoming Batch " + batch.getNodeBatchId(); + String value = "Node " + batch.getNodeId() + " incoming batch " + batch.getBatchId() + + " is in error at " + formatDate(batch.getCreateTime()); + entries.add(createEntry(title, value)); + } + + for (OutgoingBatch batch : findOutgoingBatchErrors()) { + String title = "Outgoing Batch " + batch.getNodeBatchId(); + String value = "Node " + batch.getNodeId() + " outgoing batch " + batch.getBatchId() + + " is in error at " + formatDate(batch.getCreateTime()); + entries.add(createEntry(title, value)); + } + + feed.setEntries(entries); + + SyndFeedOutput out = new SyndFeedOutput(); + resp.setContentType("application/rss+xml"); + try { + out.output(feed, resp.getWriter()); + } catch (FeedException e) { + logger.error("Unable to generate RSS alert feed", e); + } + } + + private SyndEntry createEntry(String title, String value) { + SyndEntry entry = new SyndEntryImpl(); + entry.setTitle(title); + SyndContent content = new SyndContentImpl(); + content.setType("text/html"); + content.setValue(value); + entry.setDescription(content); + return entry; + } + + private String formatDate(Date date) { + SimpleDateFormat formatter = new SimpleDateFormat(DATE_FORMAT); + return formatter.format(date); + } + + private List findIncomingBatchErrors() { + IIncomingBatchService incomingBatchService = (IIncomingBatchService) getContext().getBean( + Constants.INCOMING_BATCH_SERVICE); + return incomingBatchService.findIncomingBatchErrors(MAX_ERRORS); + } + + private List findOutgoingBatchErrors() { + IOutgoingBatchService outgoingBatchService = (IOutgoingBatchService) getContext().getBean( + Constants.OUTGOING_BATCH_SERVICE); + return outgoingBatchService.getOutgoingBatcheErrors(MAX_ERRORS); + } + + @Override + protected Log getLogger() { + return logger; + } } diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index e53d9ed63e..cf46368c55 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -215,21 +215,29 @@ - select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type, 1 + select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type, create_time, 1 from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status in ('NE','SE') and b.channel_id not in (select distinct e.channel_id from ${sync.table.prefix}_outgoing_batch e where e.status = 'ER' and e.node_id = b.node_id) union - select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type, 2 - from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status = 'ER' order by 6, 1 + select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type, create_time, 2 + from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status = 'ER' order by 7, 1 - select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type - from ${sync.table.prefix}_outgoing_batch b - where b.batch_id between ? and ? - order by b.batch_id + select batch_id, node_id, channel_id, status, batch_type + from ${sync.table.prefix}_outgoing_batch + where batch_id between ? and ? + order by batch_id + + + + + select batch_id, node_id, channel_id, status, batch_type, create_time + from ${sync.table.prefix}_outgoing_batch + where status = 'ER' + order by batch_id @@ -314,8 +322,15 @@ - select batch_id, node_id, status from ${sync.table.prefix}_incoming_batch where - batch_id = ? and node_id = ? + select batch_id, node_id, status, create_time from + ${sync.table.prefix}_incoming_batch where batch_id = ? and node_id = ? + + + + + select batch_id, node_id, status, create_time from + ${sync.table.prefix}_incoming_batch where status = 'ER' + order by batch_id