diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor14.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor14.java new file mode 100644 index 0000000000..5f6a4a0266 --- /dev/null +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor14.java @@ -0,0 +1,121 @@ +/* + * SymmetricDS is an open source database synchronization solution. + * + * Copyright (C) Chris Henson + * Copyright (C) Andrew Wilcox + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + */ + +package org.jumpmind.symmetric.extract.csv; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.Map; + +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.common.csv.CsvConstants; +import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.extract.DataExtractorContext; +import org.jumpmind.symmetric.extract.IDataExtractor; +import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.service.INodeService; +import org.jumpmind.symmetric.service.IParameterService; + +public class CsvExtractor14 implements IDataExtractor { + + private Map dictionary = null; + + private IParameterService parameterService; + + private IDbDialect dbDialect; + + private INodeService nodeService; + + public void init(BufferedWriter writer, DataExtractorContext context) throws IOException { + Node nodeIdentity = nodeService.findIdentity(); + String nodeId = (nodeIdentity == null) ? parameterService.getString(ParameterConstants.EXTERNAL_ID) + : nodeIdentity.getNodeId(); + Util.write(writer, CsvConstants.NODEID, AbstractStreamDataCommand.DELIMITER, nodeId); + writer.newLine(); + } + + public void begin(OutgoingBatch batch, BufferedWriter writer) throws IOException { + Util.write(writer, CsvConstants.BATCH, AbstractStreamDataCommand.DELIMITER, Long.toString(batch.getBatchId())); + writer.newLine(); + Util.write(writer, CsvConstants.BINARY, AbstractStreamDataCommand.DELIMITER, dbDialect.getBinaryEncoding() + .name()); + writer.newLine(); + } + + public void commit(OutgoingBatch batch, BufferedWriter writer) throws IOException { + Util.write(writer, CsvConstants.COMMIT, AbstractStreamDataCommand.DELIMITER, Long.toString(batch.getBatchId())); + writer.newLine(); + } + + public void write(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException { + preprocessTable(data, writer, context); + dictionary.get(data.getEventType().getCode()).execute(writer, data, context); + } + + /** + * Writes the table metadata out to a stream only if it hasn't already been + * written out before + * + * @param tableName + * @param out + */ + public void preprocessTable(Data data, BufferedWriter out, DataExtractorContext context) throws IOException { + + if (data.getAudit() == null) { + throw new RuntimeException("Missing trigger_hist for table " + data.getTableName() + + ": try running syncTriggers() or restarting SymmetricDS"); + } + String auditKey = Integer.toString(data.getAudit().getTriggerHistoryId()).intern(); + if (!context.getAuditRecordsWritten().contains(auditKey)) { + Util.write(out, CsvConstants.TABLE, ", ", data.getTableName()); + out.newLine(); + Util.write(out, CsvConstants.KEYS, ", ", data.getAudit().getPkColumnNames()); + out.newLine(); + Util.write(out, CsvConstants.COLUMNS, ", ", data.getAudit().getColumnNames()); + out.newLine(); + context.getAuditRecordsWritten().add(auditKey); + } else if (!context.isLastTable(data.getTableName())) { + Util.write(out, CsvConstants.TABLE, ", ", data.getTableName()); + out.newLine(); + } + + context.setLastTableName(data.getTableName()); + } + + public void setDictionary(Map dictionary) { + this.dictionary = dictionary; + } + + public void setDbDialect(IDbDialect dbDialect) { + this.dbDialect = dbDialect; + } + + public void setParameterService(IParameterService parameterService) { + this.parameterService = parameterService; + } + + public void setNodeService(INodeService nodeService) { + this.nodeService = nodeService; + } + +}