Skip to content

Commit

Permalink
For 1825992 feature request adds RSS feed through AlertServlet
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 5, 2007
1 parent 1e91b66 commit d485f09
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 33 deletions.
5 changes: 5 additions & 0 deletions symmetric/pom.xml
Expand Up @@ -428,5 +428,10 @@
<artifactId>mysql-connector-java</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>rome</groupId>
<artifactId>rome</artifactId>
<version>0.9</version>
</dependency>
</dependencies>
</project>
Expand Up @@ -22,6 +22,7 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.util.Date;

import org.jumpmind.symmetric.load.IDataLoaderContext;

Expand All @@ -38,6 +39,8 @@ public enum Status {
private String nodeId;

private Status status;

private Date createTime;

private boolean isRetry;

Expand Down Expand Up @@ -86,4 +89,12 @@ public void setRetry(boolean isRetry) {
this.isRetry = isRetry;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

}
Expand Up @@ -22,6 +22,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class OutgoingBatch implements Serializable {
Expand All @@ -41,6 +42,8 @@ public enum Status {
private Status status = Status.NE;

private BatchType batchType = BatchType.EVENTS;

private Date createTime;

public OutgoingBatch() {
}
Expand All @@ -52,6 +55,9 @@ public OutgoingBatch(Node node, String channelId, BatchType batchType) {
this.batchType = batchType;
}

public String getNodeBatchId() {
return nodeId + "-" + batchId;
}

public String getBatchId() {
return batchId;
Expand Down Expand Up @@ -113,4 +119,12 @@ public void setBatchType(String batchType) {
}
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

}
Expand Up @@ -31,6 +31,8 @@ public interface IIncomingBatchService {

public IncomingBatch findIncomingBatch(String batchId, String nodeId);

public List<IncomingBatch> findIncomingBatchErrors(int maxRows);

public List<IncomingBatchHistory> findIncomingBatchHistory(String batchId, String nodeId);

@Transactional
Expand All @@ -44,4 +46,5 @@ public interface IIncomingBatchService {

@Transactional
public int updateIncomingBatch(IncomingBatch status);

}
Expand Up @@ -35,6 +35,8 @@ public interface IOutgoingBatchService {

public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId);

public List<OutgoingBatch> getOutgoingBatcheErrors(int maxRows);

public void markOutgoingBatchSent(OutgoingBatch batch);

public void setBatchStatus(String batchId, Status status);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.IncomingBatchHistory;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.util.MaxRowsStatementCreator;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.RowMapper;
Expand All @@ -40,6 +41,8 @@ public class IncomingBatchService extends AbstractService implements IIncomingBa

private String findIncomingBatchSql;

private String findIncomingBatchErrorsSql;

private String findIncomingBatchHistorySql;

private String insertIncomingBatchSql;
Expand All @@ -59,6 +62,12 @@ public IncomingBatch findIncomingBatch(String batchId, String nodeId) {
}
}

@SuppressWarnings("unchecked")
public List<IncomingBatch> findIncomingBatchErrors(int maxRows) {
return (List<IncomingBatch>) jdbcTemplate.query(new MaxRowsStatementCreator(
findIncomingBatchErrorsSql, maxRows), new IncomingBatchMapper());
}

@SuppressWarnings("unchecked")
public List<IncomingBatchHistory> findIncomingBatchHistory(String batchId, String nodeId) {
return (List<IncomingBatchHistory>) jdbcTemplate.query(findIncomingBatchHistorySql, new Object[] { batchId,
Expand Down Expand Up @@ -104,6 +113,7 @@ public Object mapRow(ResultSet rs, int num) throws SQLException {
batch.setBatchId(rs.getString(1));
batch.setNodeId(rs.getString(2));
batch.setStatus(IncomingBatch.Status.valueOf(rs.getString(3)));
batch.setCreateTime(rs.getTimestamp(4));
return batch;
}
}
Expand Down Expand Up @@ -149,4 +159,8 @@ public void setSkipDuplicateBatches(boolean skipDuplicateBatchesEnabled) {
this.skipDuplicateBatches = skipDuplicateBatchesEnabled;
}

public void setFindIncomingBatchErrorsSql(String findIncomingBatchErrorsSql) {
this.findIncomingBatchErrorsSql = findIncomingBatchErrorsSql;
}

}
Expand Up @@ -35,6 +35,7 @@
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IOutgoingBatchHistoryService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.util.MaxRowsStatementCreator;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
Expand All @@ -57,6 +58,8 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa

private String selectOutgoingBatchRangeSql;

private String selectOutgoingBatchErrorsSql;

private String changeBatchStatusSql;

private String initialLoadStatusSql;
Expand Down Expand Up @@ -198,35 +201,21 @@ private void insertOutgoingBatch(Connection conn, OutgoingBatch outgoingBatch) t
@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql, new Object[] { nodeId, nodeId },
new RowMapper() {
public Object mapRow(ResultSet rs, int index) throws SQLException {
OutgoingBatch batch = new OutgoingBatch();
batch.setBatchId(rs.getString(1));
batch.setNodeId(rs.getString(2));
batch.setChannelId(rs.getString(3));
batch.setStatus(rs.getString(4));
batch.setBatchType(rs.getString(5));
return batch;
}
});
new OutgoingBatchMapper());
}

@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId) {
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(selectOutgoingBatchRangeSql,
new Object[] { startBatchId, endBatchId }, new RowMapper() {
public Object mapRow(ResultSet rs, int index) throws SQLException {
OutgoingBatch batch = new OutgoingBatch();
batch.setBatchId(rs.getString(1));
batch.setNodeId(rs.getString(2));
batch.setChannelId(rs.getString(3));
batch.setStatus(rs.getString(4));
batch.setBatchType(rs.getString(5));
return batch;
}
});
new Object[] { startBatchId, endBatchId }, new OutgoingBatchMapper());
}

@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatcheErrors(int maxRows) {
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(new MaxRowsStatementCreator(
selectOutgoingBatchErrorsSql, maxRows), new OutgoingBatchMapper());
}

public void markOutgoingBatchSent(OutgoingBatch batch) {
setBatchStatus(batch.getBatchId(), Status.SE);
}
Expand Down Expand Up @@ -265,6 +254,19 @@ public boolean isInitialLoadComplete(String nodeId) {
return returnValue;
}

class OutgoingBatchMapper implements RowMapper {
public Object mapRow(ResultSet rs, int num) throws SQLException {
OutgoingBatch batch = new OutgoingBatch();
batch.setBatchId(rs.getString(1));
batch.setNodeId(rs.getString(2));
batch.setChannelId(rs.getString(3));
batch.setStatus(rs.getString(4));
batch.setBatchType(rs.getString(5));
batch.setCreateTime(rs.getTimestamp(6));
return batch;
}
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}
Expand Down Expand Up @@ -305,4 +307,8 @@ public void setSelectOutgoingBatchRangeSql(String selectOutgoingBatchRangeSql) {
this.selectOutgoingBatchRangeSql = selectOutgoingBatchRangeSql;
}

public void setSelectOutgoingBatchErrorsSql(String selectOutgoingBatchErrorsSql) {
this.selectOutgoingBatchErrorsSql = selectOutgoingBatchErrorsSql;
}

}
@@ -0,0 +1,26 @@
package org.jumpmind.symmetric.util;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementCreator;

public class MaxRowsStatementCreator implements PreparedStatementCreator {

private String sql;

private int maxRows;

public MaxRowsStatementCreator(String sql, int maxRows) {
this.sql = sql;
this.maxRows = maxRows;
}

public PreparedStatement createPreparedStatement(Connection conn) throws SQLException {
PreparedStatement st = conn.prepareStatement(sql);
st.setMaxRows(maxRows);
return st;
}

}
107 changes: 105 additions & 2 deletions symmetric/src/main/java/org/jumpmind/symmetric/web/AlertServlet.java
@@ -1,7 +1,8 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>,
* Eric Long <erilong@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
Expand All @@ -20,9 +21,111 @@

package org.jumpmind.symmetric.web;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;

import com.sun.syndication.feed.synd.SyndContent;
import com.sun.syndication.feed.synd.SyndContentImpl;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndEntryImpl;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.feed.synd.SyndFeedImpl;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedOutput;

/**
* This is a place holder for the idea that we could create an RSS feed for alerts.
*/
public class AlertServlet {
public class AlertServlet extends AbstractServlet {

private static final long serialVersionUID = 1L;

private static final Log logger = LogFactory.getLog(AlertServlet.class);

private static final int MAX_ERRORS = 1000;

private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
SyndFeed feed = new SyndFeedImpl();
feed.setFeedType("rss_2.0");
feed.setTitle("SymmetricDS Alerts");
feed.setDescription("Problems synchronizing data");
feed.setLink(req.getRequestURL().toString());

List<SyndEntry> entries = new ArrayList<SyndEntry>();

for (IncomingBatch batch : findIncomingBatchErrors()) {
String title = "Incoming Batch " + batch.getNodeBatchId();
String value = "Node " + batch.getNodeId() + " incoming batch " + batch.getBatchId()
+ " is in error at " + formatDate(batch.getCreateTime());
entries.add(createEntry(title, value));
}

for (OutgoingBatch batch : findOutgoingBatchErrors()) {
String title = "Outgoing Batch " + batch.getNodeBatchId();
String value = "Node " + batch.getNodeId() + " outgoing batch " + batch.getBatchId()
+ " is in error at " + formatDate(batch.getCreateTime());
entries.add(createEntry(title, value));
}

feed.setEntries(entries);

SyndFeedOutput out = new SyndFeedOutput();
resp.setContentType("application/rss+xml");
try {
out.output(feed, resp.getWriter());
} catch (FeedException e) {
logger.error("Unable to generate RSS alert feed", e);
}
}

private SyndEntry createEntry(String title, String value) {
SyndEntry entry = new SyndEntryImpl();
entry.setTitle(title);
SyndContent content = new SyndContentImpl();
content.setType("text/html");
content.setValue(value);
entry.setDescription(content);
return entry;
}

private String formatDate(Date date) {
SimpleDateFormat formatter = new SimpleDateFormat(DATE_FORMAT);
return formatter.format(date);
}

private List<IncomingBatch> findIncomingBatchErrors() {
IIncomingBatchService incomingBatchService = (IIncomingBatchService) getContext().getBean(
Constants.INCOMING_BATCH_SERVICE);
return incomingBatchService.findIncomingBatchErrors(MAX_ERRORS);
}

private List<OutgoingBatch> findOutgoingBatchErrors() {
IOutgoingBatchService outgoingBatchService = (IOutgoingBatchService) getContext().getBean(
Constants.OUTGOING_BATCH_SERVICE);
return outgoingBatchService.getOutgoingBatcheErrors(MAX_ERRORS);
}

@Override
protected Log getLogger() {
return logger;
}

}

0 comments on commit d485f09

Please sign in to comment.