Skip to content

Commit

Permalink
Re-added the meteredoutputstream and turned auth on for push, pull an…
Browse files Browse the repository at this point in the history
…d ack.
  • Loading branch information
chenson42 committed Oct 2, 2007
1 parent afa3baa commit 5485431
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 80 deletions.
Expand Up @@ -60,4 +60,6 @@ public class Constants {

public static final String JDBC = "jdbcTemplate";

public static final String DOWNLOAD_RATE = "downloadRateKb";

}
@@ -0,0 +1,98 @@
package org.jumpmind.symmetric.web;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.logging.Log;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
import org.jumpmind.symmetric.transport.metered.MeteredOutputStreamOutgoingTransport;
import org.jumpmind.symmetric.util.MeteredOutputStream;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

abstract public class AbstractServlet extends HttpServlet {

protected int getDownloadRate() {
return (Integer) getContext().getBean(Constants.DOWNLOAD_RATE);
}

protected abstract Log getLogger();

protected IOutgoingTransport createOutgoingTransport(
HttpServletResponse resp) throws IOException {
int downloadRate = getDownloadRate();
if (downloadRate > 0) {
return new MeteredOutputStreamOutgoingTransport(resp
.getOutputStream(), downloadRate*MeteredOutputStream.KB);
} else {
return new InternalOutgoingTransport(resp.getOutputStream());
}
}

protected OutputStream createOutputStream(HttpServletResponse resp)
throws IOException {
int downloadRate = getDownloadRate();
if (downloadRate > 0) {
return new MeteredOutputStream(resp.getOutputStream(), downloadRate*MeteredOutputStream.KB);
} else {
return resp.getOutputStream();
}
}

protected InputStream createInputStream(HttpServletRequest req)
throws IOException {
InputStream is = null;

if (getLogger().isDebugEnabled()) {
StringBuilder b = new StringBuilder();
BufferedReader reader = req.getReader();
String line = null;
do {
line = reader.readLine();
if (line != null) {
b.append(line);
b.append("\n");
}
} while (line != null);

getLogger().debug("Received: \n" + b);
is = new ByteArrayInputStream(b.toString().getBytes());
} else {
is = req.getInputStream();
}

return is;
}

protected ApplicationContext getContext() {
return WebApplicationContextUtils
.getWebApplicationContext(getServletContext());
}

protected IDataLoaderService getDataLoaderService() {
return (IDataLoaderService) getContext().getBean(
Constants.DATALOADER_SERVICE);
}

protected INodeService getNodeService() {
return (INodeService) getContext().getBean(Constants.NODE_SERVICE);
}

protected IDataExtractorService getDataExtractorService() {
return (IDataExtractorService) getContext().getBean(
Constants.DATAEXTRACTOR_SERVICE);
}

}
44 changes: 20 additions & 24 deletions symmetric/src/main/java/org/jumpmind/symmetric/web/PullServlet.java
Expand Up @@ -3,28 +3,19 @@
import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

/**
* The default download rate is 20k/sec. This change be changed via the servlet
* param <code>kbs-rate</code>
*
* @author awilcox
*
*/
public class PullServlet extends HttpServlet {
public class PullServlet extends AbstractServlet {

private static final Log logger = LogFactory.getLog(PullServlet.class);

Expand All @@ -39,30 +30,35 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp)
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (req.getParameter(WebConstants.NODE_ID) == null
|| req.getParameter(WebConstants.NODE_ID).trim().length() == 0) {

String nodeId = req.getParameter(WebConstants.NODE_ID);

if (logger.isDebugEnabled()) {
logger.debug("Pull request received from "
+ nodeId);
}

if (StringUtils.isBlank(nodeId)) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}

String nodeId = req.getParameter(WebConstants.NODE_ID).trim();

ApplicationContext ctx = WebApplicationContextUtils
.getWebApplicationContext(getServletContext());
nodeId = nodeId.trim();

IDataExtractorService extractor = (IDataExtractorService) ctx
.getBean(Constants.DATAEXTRACTOR_SERVICE);
INodeService nodeService = (INodeService) ctx
.getBean(Constants.NODE_SERVICE);
try {
IOutgoingTransport out = new InternalOutgoingTransport(resp
.getOutputStream());
extractor.extract(nodeService.findNode(nodeId), out);
IOutgoingTransport out = createOutgoingTransport(resp);
getDataExtractorService().extract(
getNodeService().findNode(nodeId), out);
out.close();
} catch (Exception ex) {
logger.error("Error while pulling data for " + nodeId, ex);
resp.sendError(501);
}
}

@Override
protected Log getLogger() {
return logger;
}

}
73 changes: 17 additions & 56 deletions symmetric/src/main/java/org/jumpmind/symmetric/web/PushServlet.java
@@ -1,31 +1,20 @@
package org.jumpmind.symmetric.web;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

/**
* The default download rate is 20k/sec. This change be changed via the servlet
* param <code>kbs-rate</code>
*
* @author awilcox
*
*/
public class PushServlet extends HttpServlet {
public class PushServlet extends AbstractServlet {
private static final long serialVersionUID = 1L;

private static final Log logger = LogFactory.getLog(PushServlet.class);
Expand All @@ -34,61 +23,33 @@ public class PushServlet extends HttpServlet {
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {

String nodeId = req.getParameter(WebConstants.NODE_ID);

if (logger.isDebugEnabled()) {
logger.debug("Push request received from "
+ req.getParameter(WebConstants.NODE_ID));
logger.debug("Push request received from " + nodeId);
}

ApplicationContext ctx = WebApplicationContextUtils
.getWebApplicationContext(getServletContext());
IDataLoaderService service = (IDataLoaderService) ctx
.getBean(Constants.DATALOADER_SERVICE);

InputStream is = null;

if (logger.isDebugEnabled()) {
StringBuilder b = new StringBuilder();
BufferedReader reader = req.getReader();
String line = null;
do {
line = reader.readLine();
if (line != null) {
b.append(line);
b.append("\n");
}
} while (line != null);

logger.debug("Received: " + b);
is = new ByteArrayInputStream(b.toString().getBytes());
} else {
is = req.getInputStream();
try {
InputStream is = createInputStream(req);
OutputStream out = createOutputStream(resp);
getDataLoaderService().loadData(is, out);
out.flush();
} catch (Exception ex) {
logger
.error("Error while processing pushed data for " + nodeId,
ex);
resp.sendError(501);
}

OutputStream out = getOutputstream(resp);
service.loadData(is, out);
out.flush();

if (logger.isDebugEnabled()) {
logger.debug("Done with push request from "
+ req.getParameter(WebConstants.NODE_ID));
}
}

private OutputStream getOutputstream(HttpServletResponse resp)
throws IOException {
// // TODO get the pull rate per client
// String param = getInitParameter("kbs-rate");
// int rate = 20;
//
// if (param != null) {
// rate = Integer.parseInt(param);
// }
//
// MeteredOutputStream out = new MeteredOutputStream(resp
// .getOutputStream(), MeteredOutputStream.KB * rate);

return resp.getOutputStream();

@Override
protected Log getLogger() {
return logger;
}

}
5 changes: 5 additions & 0 deletions symmetric/src/main/resources/symmetric-properties.xml
Expand Up @@ -22,6 +22,7 @@
<prop key="db.pool.min.evictable.idle.millis">120000</prop>
<prop key="sync.table.prefix">sym</prop>
<prop key="symmetric.auto.config.database">true</prop>
<prop key="symmetric.runtime.download.rate.kb">-1</prop>
<prop key="symmetric.runtime.schema.version">?</prop>
<prop key="symmetric.runtime.job.random.max.start.time.ms">10000</prop>
<prop key="symmetric.runtime.purge.retention.minutes">7200</prop>
Expand Down Expand Up @@ -55,6 +56,10 @@
</list>
</property>
</bean>

<bean id="downloadRateKb" class="java.lang.Integer">
<constructor-arg value="${symmetric.runtime.download.rate.kb}"/>
</bean>

<bean id="propertyPlaceholderConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
Expand Down

0 comments on commit 5485431

Please sign in to comment.