Skip to content

Commit

Permalink
changes for the "C" create event for creating tables (was DDL event)
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 5, 2007
1 parent a5b7a98 commit 71c6743
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 43 deletions.
Expand Up @@ -47,6 +47,6 @@ public class CsvConstants {

public static final String SQL = "sql";

public static final String DDL = "ddl";
public static final String CREATE = "create";

}
@@ -0,0 +1,42 @@
package org.jumpmind.symmetric.common.csv;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;

import com.csvreader.CsvReader;
import com.csvreader.CsvWriter;

public class CsvUtil {

public static String[] tokenizeCsvData(String csvData) {
String[] tokens = null;
if (csvData != null) {
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(csvData.getBytes()));
CsvReader csvReader = new CsvReader(reader);
csvReader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
try {
if (csvReader.readRecord()) {
tokens = csvReader.getValues();
}
} catch (IOException e) {
}
}
return tokens;
}

public static String escapeCsvData(String data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
try {
writer.write(data);
writer.close();
out.close();
} catch (IOException e) {
}
return out.toString();
}
}
Expand Up @@ -7,10 +7,10 @@
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

public class StreamDDLDataCommand extends AbstractStreamDataCommand {
public class StreamCreateDataCommand extends AbstractStreamDataCommand {

public void execute(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
Util.write(writer, CsvConstants.DDL, DELIMITER, data.getRowData());
Util.write(writer, CsvConstants.CREATE, DELIMITER, data.getRowData());
writer.newLine();
}
}
Expand Up @@ -134,11 +134,11 @@ public void load() throws IOException {
break;
} else if (tokens[0].equals(CsvConstants.SQL)) {
if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
runSql(csvReader.getRawRecord());
runSql(tokens[1]);
}
} else if (tokens[0].equals(CsvConstants.DDL)) {
if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
runDdl(csvReader.getRawRecord());
} else if (tokens[0].equals(CsvConstants.CREATE)) {
if (!context.isSkipping()) {
runDdl(tokens[1]);
}
} else if (tokens[0].equals(CsvConstants.BINARY)) {
try {
Expand Down Expand Up @@ -324,10 +324,11 @@ public void close() {
if (csvReader != null) {
csvReader.close();
}

Table[] tables = context.getAllTablesProcessed();
for (Table table : tables) {
dbDialect.cleanupAfterInserts(table);
if (context != null) {
Table[] tables = context.getAllTablesProcessed();
for (Table table : tables) {
dbDialect.cleanupAfterInserts(table);
}
}
}

Expand Down
Expand Up @@ -58,7 +58,7 @@ public enum DataEventType implements ICoded {
/**
* An event that indicates that the data payload is a table creation.
*/
DDL("DDL");
CREATE("C");

private String code;

Expand All @@ -81,8 +81,8 @@ public static DataEventType getEventType(String s) {
return DataEventType.RELOAD;
} else if (s.equals("S")) {
return DataEventType.SQL;
} else if (s.equals("DDL")) {
return DataEventType.DDL;
} else if (s.equals("C")) {
return DataEventType.CREATE;
}
return null;
}
Expand Down
Expand Up @@ -25,6 +25,12 @@ public interface IDataService {

public void insertDataEvent(Data data, String nodeId);

public void insertPurgeEvent(Node targetNode, Trigger trigger);

public void insertSqlEvent(Node targetNode, Trigger trigger, String sql);

public void insertCreateEvent(Node targetNode, Trigger trigger, String xml);

public Data createData(String tableName);

public Data createData(String tableName, String whereClause);
Expand Down
Expand Up @@ -20,10 +20,8 @@

package org.jumpmind.symmetric.service.impl;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand All @@ -36,6 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.csv.CsvUtil;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.model.Data;
Expand All @@ -51,7 +50,6 @@
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.PreparedStatementCallback;

import com.csvreader.CsvReader;
import com.csvreader.CsvWriter;

public class DataService extends AbstractService implements IDataService {
Expand Down Expand Up @@ -86,22 +84,22 @@ public void insertReloadEvent(final Node targetNode, final Trigger trigger) {
insertDataEvent(data, targetNode.getNodeId());
}

public void createPurgeEvent(final Node targetNode, final Trigger trigger) {
public void insertPurgeEvent(final Node targetNode, final Trigger trigger) {
String sql = dbDialect.createPurgeSqlFor(targetNode, trigger);
createSqlEvent(targetNode, trigger, sql);
insertSqlEvent(targetNode, trigger, sql);
}

public void createSqlEvent(final Node targetNode, final Trigger trigger, String sql) {
public void insertSqlEvent(final Node targetNode, final Trigger trigger, String sql) {
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.SQL, sql, null,
history);
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.SQL,
CsvUtil.escapeCsvData(sql), null, history);
insertDataEvent(data, targetNode.getNodeId());
}

public void createTableEvent(final Node targetNode, final Trigger trigger, String xml) {
public void insertCreateEvent(final Node targetNode, final Trigger trigger, String xml) {
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.DDL, xml, null,
history);
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.CREATE,
CsvUtil.escapeCsvData(xml), null, history);
insertDataEvent(data, targetNode.getNodeId());
}

Expand Down Expand Up @@ -155,13 +153,13 @@ public String reloadNode(String nodeId) {
if (createFirstForReload) {
for (Trigger trigger : triggers) {
String xml = dbDialect.getCreateTableXML(trigger);
createTableEvent(targetNode, trigger, xml);
insertCreateEvent(targetNode, trigger, xml);
}
}
if (deleteFirstForReload) {
for (ListIterator<Trigger> iterator = triggers.listIterator(triggers.size()); iterator.hasPrevious();) {
Trigger trigger = iterator.previous();
createPurgeEvent(targetNode, trigger);
insertPurgeEvent(targetNode, trigger);
}

outgoingBatchService.buildOutgoingBatches(nodeId);
Expand Down Expand Up @@ -220,16 +218,16 @@ public Data createData(String tableName, String whereClause) {

public Map<String, String> getRowDataAsMap(Data data) {
Map<String, String> map = new HashMap<String, String>();
String[] columnNames = tokenizeCsvData(data.getAudit().getColumnNames());
String[] columnData = tokenizeCsvData(data.getRowData());
String[] columnNames = CsvUtil.tokenizeCsvData(data.getAudit().getColumnNames());
String[] columnData = CsvUtil.tokenizeCsvData(data.getRowData());
for (int i = 0; i < columnNames.length; i++) {
map.put(columnNames[i].toLowerCase(), columnData[i]);
}
return map;
}

public void setRowDataFromMap(Data data, Map<String, String> map) {
String[] columnNames = tokenizeCsvData(data.getAudit().getColumnNames());
String[] columnNames = CsvUtil.tokenizeCsvData(data.getAudit().getColumnNames());
ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
Expand All @@ -243,20 +241,9 @@ public void setRowDataFromMap(Data data, Map<String, String> map) {
data.setRowData(out.toString());
}

@Deprecated
public String[] tokenizeCsvData(String csvData) {
String[] tokens = null;
if (csvData != null) {
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(csvData.getBytes()));
CsvReader csvReader = new CsvReader(reader);
csvReader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
try {
if (csvReader.readRecord()) {
tokens = csvReader.getValues();
}
} catch (IOException e) {
}
}
return tokens;
return CsvUtil.tokenizeCsvData(csvData);
}

public void setReloadListeners(List<IReloadListener> listeners) {
Expand Down
3 changes: 3 additions & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -541,6 +541,9 @@
<entry key="S">
<bean class="org.jumpmind.symmetric.extract.csv.StreamSQLDataCommand"></bean>
</entry>
<entry key="C">
<bean class="org.jumpmind.symmetric.extract.csv.StreamCreateDataCommand"></bean>
</entry>
</map>
</property>
</bean>
Expand Down

0 comments on commit 71c6743

Please sign in to comment.