Skip to content

Commit

Permalink
[ 1851178 ] Make the protocol pluggable -- fixes "binary" directive a…
Browse files Browse the repository at this point in the history
…dded in 1.1 when speaking to 1.0
  • Loading branch information
erilong committed Dec 18, 2007
1 parent 0757f35 commit 69b9dc3
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 14 deletions.
@@ -0,0 +1,101 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
* Copyright (C) Andrew Wilcox <andrewbwilcox@users.sourceforge.net>,
* Eric Long <erilong@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/

package org.jumpmind.symmetric.extract.csv;

import java.io.BufferedWriter;
import java.io.IOException;
import java.util.Map;

import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.config.IRuntimeConfig;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.extract.IDataExtractor;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.OutgoingBatch;

public class CsvExtractor10 implements IDataExtractor {

private Map<String, IStreamDataCommand> dictionary = null;

private IRuntimeConfig runtimeConfiguration;

public void init(BufferedWriter writer, DataExtractorContext context)
throws IOException {
Util.write(writer, CsvConstants.NODEID, AbstractStreamDataCommand.DELIMITER, runtimeConfiguration.getExternalId());
writer.newLine();
}

public void begin(OutgoingBatch batch, BufferedWriter writer)
throws IOException {
Util.write(writer, CsvConstants.BATCH, AbstractStreamDataCommand.DELIMITER, batch.getBatchId());
writer.newLine();
}

public void commit(OutgoingBatch batch, BufferedWriter writer)
throws IOException {
Util.write(writer, CsvConstants.COMMIT, AbstractStreamDataCommand.DELIMITER, batch.getBatchId());
writer.newLine();
}

public void write(BufferedWriter writer, Data data,
DataExtractorContext context) throws IOException {
preprocessTable(data, writer, context);
dictionary.get(data.getEventType().getCode()).execute(writer, data, context);
}

/**
* Writes the table metadata out to a stream only if it hasn't already been
* written out before
*
* @param tableName
* @param out
*/
public void preprocessTable(Data data, BufferedWriter out,
DataExtractorContext context) throws IOException {

String auditKey = Integer.toString(data.getAudit().getTriggerHistoryId()).intern();
if (!context.getAuditRecordsWritten().contains(auditKey)) {
Util.write(out, "table, ", data.getTableName());
out.newLine();
Util.write(out, "keys, ", data.getAudit().getPkColumnNames());
out.newLine();
Util.write(out, "columns, ", data.getAudit().getColumnNames());
out.newLine();
context.getAuditRecordsWritten().add(auditKey);
} else if (!context.isLastTable(data.getTableName())) {
Util.write(out, "table, ", data.getTableName());
out.newLine();
}

context.setLastTableName(data.getTableName());
}

public void setRuntimeConfiguration(IRuntimeConfig runtimeConfiguration) {
this.runtimeConfiguration = runtimeConfiguration;
}

public void setDictionary(Map<String, IStreamDataCommand> dictionary) {
this.dictionary = dictionary;
}

}
Expand Up @@ -30,40 +30,44 @@
import java.sql.SQLException;
import java.util.List;

import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.extract.IDataExtractor;
import org.jumpmind.symmetric.model.BatchType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IExtractListener;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.JdbcUtils;

public class DataExtractorService implements IDataExtractorService {
public class DataExtractorService implements IDataExtractorService, BeanFactoryAware {

private IOutgoingBatchService outgoingBatchService;

private IConfigurationService configurationService;

private IDataExtractor dataExtractor;

private IDbDialect dbDialect;

private JdbcTemplate jdbcTemplate;

private BeanFactory beanFactory;

private DataExtractorContext context;

private String tablePrefix;
Expand All @@ -78,6 +82,7 @@ public void extractNodeIdentityFor(Node node, IOutgoingTransport transport) {
try {
BufferedWriter writer = transport.open();
DataExtractorContext ctxCopy = context.copy();
IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
dataExtractor.init(writer, ctxCopy);
dataExtractor.begin(batch, writer);
TriggerHistory audit = new TriggerHistory(tableName, "node_id", "node_id");
Expand All @@ -89,6 +94,18 @@ public void extractNodeIdentityFor(Node node, IOutgoingTransport transport) {
}
}

private IDataExtractor getDataExtractor(String version) {
String beanName = Constants.DATA_EXTRACTOR;
if (version != null) {
int[] versions = Version.parseVersion(version);
// TODO: this should be versions[1] == 0 for 1.2 release
if (versions[0] == 1 && versions[1] <= 1) {
beanName += "10";
}
}
return (IDataExtractor) beanFactory.getBean(beanName);
}

public OutgoingBatch extractInitialLoadFor(Node node, final Trigger trigger, final IOutgoingTransport transport) {

OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD);
Expand All @@ -108,6 +125,7 @@ protected void writeInitialLoad(Node node, final Trigger trigger, final IOutgoin

final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);
final TriggerHistory audit = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
final IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());

jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
Expand Down Expand Up @@ -140,7 +158,8 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
}

public boolean extract(Node node, IOutgoingTransport transport) throws Exception {
ExtractStreamHandler handler = new ExtractStreamHandler(transport);
IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
return extract(node, handler);
}

Expand Down Expand Up @@ -184,8 +203,8 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti

public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
throws Exception {

ExtractStreamHandler handler = new ExtractStreamHandler(transport);
IDataExtractor dataExtractor = getDataExtractor(null);
ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
return extractBatchRange(handler, startBatchId, endBatchId);
}

Expand Down Expand Up @@ -255,10 +274,6 @@ public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
this.outgoingBatchService = batchBuilderService;
}

public void setDataExtractor(IDataExtractor dataExtractor) {
this.dataExtractor = dataExtractor;
}

public void setContext(DataExtractorContext context) {
this.context = context;
}
Expand All @@ -283,12 +298,15 @@ class ExtractStreamHandler implements IExtractListener {

IOutgoingTransport transport;

IDataExtractor dataExtractor;

DataExtractorContext context;

BufferedWriter writer;

ExtractStreamHandler(IOutgoingTransport transport) throws Exception {
ExtractStreamHandler(IDataExtractor dataExtractor, IOutgoingTransport transport) throws Exception {
this.transport = transport;
this.dataExtractor = dataExtractor;
}

public void dataExtracted(Data data) throws Exception {
Expand Down Expand Up @@ -319,4 +337,8 @@ public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}

public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

}
31 changes: 30 additions & 1 deletion symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -249,7 +249,6 @@
<bean id="dataExtractorService"
class="org.jumpmind.symmetric.service.impl.DataExtractorService">
<property name="outgoingBatchService" ref="outgoingBatchService" />
<property name="dataExtractor" ref="dataExtractor" />
<property name="configurationService" ref="configurationService" />
<property name="dbDialect" ref="dbDialect" />
<property name="tablePrefix" value="${sync.table.prefix}" />
Expand Down Expand Up @@ -553,6 +552,36 @@
</property>
</bean>

<bean id="dataExtractor10" class="org.jumpmind.symmetric.extract.csv.CsvExtractor10">
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
<property name="dictionary">
<map>
<entry key="I">
<bean class="org.jumpmind.symmetric.extract.csv.StreamInsertDataCommand"></bean>
</entry>
<entry key="U">
<bean class="org.jumpmind.symmetric.extract.csv.StreamUpdateDataCommand"></bean>
</entry>
<entry key="D">
<bean class="org.jumpmind.symmetric.extract.csv.StreamDeleteDataCommand"></bean>
</entry>
<entry key="V">
<bean class="org.jumpmind.symmetric.extract.csv.StreamValidateDataCommand"></bean>
</entry>
<entry key="R">
<bean class="org.jumpmind.symmetric.extract.csv.StreamReloadDataCommand">
<property name="dataExtractorService" ref="dataExtractorService" />
<property name="configurationService" ref="configurationService" />
<property name="nodeService" ref="nodeService" />
</bean>
</entry>
<entry key="S">
<bean class="org.jumpmind.symmetric.extract.csv.StreamSQLDataCommand"></bean>
</entry>
</map>
</property>
</bean>

<bean id="outgoingBatchHistoryService"
class="org.jumpmind.symmetric.service.impl.OutgoingBatchHistoryService" scope="singleton">
<property name="jdbcTemplate" ref="jdbcTemplate" />
Expand Down

0 comments on commit 69b9dc3

Please sign in to comment.