From 291b2611d93bd2085e1c8291cf3f8dfaec26ffa4 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Wed, 23 May 2018 07:22:08 -0400 Subject: [PATCH] 0003573: TopSpeed (TPS) Router --- symmetric-assemble/common.gradle | 3 +- .../route/AbstractFileParsingRouter.java | 116 +++++++---- .../jumpmind/symmetric/route/CSVRouter.java | 2 +- .../jumpmind/symmetric/route/DBFRouter.java | 2 +- .../jumpmind/symmetric/route/TPSRouter.java | 182 ++++++++++++++++++ .../symmetric/service/impl/RouterService.java | 4 +- 6 files changed, 263 insertions(+), 46 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/route/TPSRouter.java diff --git a/symmetric-assemble/common.gradle b/symmetric-assemble/common.gradle index b36345416d..aee43b7b4a 100644 --- a/symmetric-assemble/common.gradle +++ b/symmetric-assemble/common.gradle @@ -233,7 +233,8 @@ subprojects { subproject -> // javax.resource needed by jaybird provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0" provided "com.datastax.cassandra:cassandra-driver-core:3.1.4" - + provided "com.datastax.cassandra:cassandra-driver-core:3.1.4" + provided "nl.cad:tps-parse:1.0.15-SNAPSHOT" testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar') testCompile "junit:junit:$junitVersion" diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java index 84e6a8886c..231cf8c206 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java @@ -1,6 +1,7 @@ package org.jumpmind.symmetric.route; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -27,9 +28,11 @@ import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.service.IContextService; +import nl.cad.tpsparse.tps.NotATopSpeedFileException; + public abstract class AbstractFileParsingRouter extends AbstractDataRouter { - public abstract List parse(File file, int lineNumber); + public abstract List parse(File file, int lineNumber, int tableId); public abstract String getColumnNames(); public abstract ISymmetricEngine getEngine(); @@ -79,56 +82,85 @@ public Set routeToNodes(SimpleRouterContext context, DataMetaData dataMe } if (triggerId != null) { - String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir(); - File file = createSourceFile(baseDir, relativeDir, fileName); - - - Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath)); - - List dataRows = parse(file, lineNumber); - String columnNames = getColumnNames(); - - String nodeList = buildNodeList(nodes); - String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY) - .append("=") - .append(triggerId) - .append(",") - .append(EXTERNAL_DATA_ROUTER_KEY) - .append("=") - .append(dataMetaData.getRouter().getRouterId()) - .append(",") - .append(EXTERNAL_DATA_FILE_DATA_ID) - .append("=") - .append(dataMetaData.getData().getDataId()).toString(); - - for (String row : dataRows) { - Data data = new Data(); + try { + String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir(); + File file = createSourceFile(baseDir, relativeDir, fileName); - data.setChannelId(channelId); - data.setDataEventType(DataEventType.INSERT); - data.setRowData(row); - data.setTableName(targetTableName); - data.setNodeList(nodeList); - data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames)); - data.setExternalData(externalData); - data.setDataId(getEngine().getDataService().insertData(data)); - lineNumber++; - } - if (!dataRows.isEmpty()) { - try { - contextService.save(filePath, lineNumber.toString()); - deleteFileIfNecessary(dataMetaData); - } - catch (Exception e) { - e.printStackTrace(); + String nodeList = buildNodeList(nodes); + String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY) + .append("=") + .append(triggerId) + .append(",") + .append(EXTERNAL_DATA_ROUTER_KEY) + .append("=") + .append(dataMetaData.getRouter().getRouterId()) + .append(",") + .append(EXTERNAL_DATA_FILE_DATA_ID) + .append("=") + .append(dataMetaData.getData().getDataId()).toString(); + + + Map tableNames = getTableNames(getTargetTableName(targetTableName, fileName), file); + int tableIndex=0; + for (Map.Entry tableEntry : tableNames.entrySet()) { + String contextId = filePath + "[" + tableEntry.getValue() + "]"; + Integer lineNumber = contextService.getString(contextId) == null ? 0 : new Integer(contextService.getString(contextId)); + + List dataRows = parse(file, lineNumber, tableEntry.getKey()); + String columnNames = getColumnNames(); + + for (String row : dataRows) { + Data data = new Data(); + + data.setChannelId(channelId); + data.setDataEventType(DataEventType.INSERT); + data.setRowData(row); + data.setTableName(tableEntry.getValue()); + data.setNodeList(nodeList); + data.setTriggerHistory(getTriggerHistory(tableEntry.getValue(), columnNames)); + data.setExternalData(externalData); + data.setDataId(getEngine().getDataService().insertData(data)); + lineNumber++; + } + if (!dataRows.isEmpty()) { + try { + contextService.save(contextId, lineNumber.toString()); + if ((tableNames.size() - 1) == tableIndex) { + deleteFileIfNecessary(dataMetaData); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + log.info("Finished parsing file[table] " + fileName + "[" + tableEntry.getValue() + "]"); + tableIndex++; } + } catch (IOException ioe) { + log.error("Unable to load file", ioe); + } catch (NotATopSpeedFileException ntsf) { + log.error("The file " + fileName + " is not a valid TopSpeed file.", ntsf); } + } } return new HashSet(); } + + public Map getTableNames(String tableName, File file) throws IOException { + Map tableNames = new HashMap(); + tableNames.put(1, (String) tableName); + return tableNames; + } + public String getTargetTableName(String targetTableName, String fileName) { + if (targetTableName == null) { + targetTableName = fileName.substring(0, fileName.indexOf(".")); + } + return targetTableName; + } + public String buildNodeList(Set nodes) { StringBuffer sb = new StringBuffer(); for (Node n : nodes) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CSVRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CSVRouter.java index 5e402b3f2e..07b7290899 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CSVRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CSVRouter.java @@ -24,7 +24,7 @@ public ISymmetricEngine getEngine() { } @Override - public List parse(File file, int lineNumber) { + public List parse(File file, int lineNumber, int tableIndex) { List rows = new ArrayList(); int currentLine = 1; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DBFRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DBFRouter.java index d6897b3474..afd5176850 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DBFRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DBFRouter.java @@ -27,7 +27,7 @@ public ISymmetricEngine getEngine() { } @Override - public List parse(File file, int lineNumber) { + public List parse(File file, int lineNumber, int tableIndex) { List rows = new ArrayList(); InputStream fileInputStream = null; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/TPSRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/TPSRouter.java new file mode 100644 index 0000000000..9dd83d9b19 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/TPSRouter.java @@ -0,0 +1,182 @@ +package org.jumpmind.symmetric.route; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.jumpmind.extension.IBuiltInExtensionPoint; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.csv.CsvWriter; + +import nl.cad.tpsparse.tps.TpsFile; +import nl.cad.tpsparse.tps.record.DataRecord; +import nl.cad.tpsparse.tps.record.FieldDefinitionRecord; +import nl.cad.tpsparse.tps.record.TableDefinitionRecord; +import nl.cad.tpsparse.tps.record.TableNameRecord; + +public class TPSRouter extends AbstractFileParsingRouter implements IDataRouter, IBuiltInExtensionPoint { + + private ISymmetricEngine engine; + + private List fields = new ArrayList(); + + private TpsFile tpsFile; + + public TPSRouter(ISymmetricEngine engine) { + this.engine = engine; + } + + @Override + public ISymmetricEngine getEngine() { + return this.engine; + } + + @Override + public List parse(File file, int lineNumber, int tableId) { + List rows = new ArrayList(); + TableDefinitionRecord table = tpsFile.getTableDefinitions(false).get(tableId); + fields.clear(); + + if (table != null && table.getFields() != null) { + for (FieldDefinitionRecord field : table.getFields()) { + fields.add(field.getFieldNameNoTable()); + } + + int currentLine = 1; + for (DataRecord rec : tpsFile.getDataRecords(tableId, table, false)) { + if (currentLine > lineNumber) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ','); + writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH); + writer.setTextQualifier('\"'); + writer.setUseTextQualifier(true); + writer.setForceQualifier(true); + + try { + int fieldPosition = 1; + for (Object val : rec.getValues()) { + String value = val.toString(); + if (val instanceof Object[]) { + if (((Object[]) val).length > 0) { + int position = 0; + boolean multipleValues = false; + for (Object elem : (Object[]) val) { + // Take only first value in array for now + if (position == 0) { + value = elem.toString(); + } else if (elem instanceof Integer) { + if (((Integer) elem) > 0) { + multipleValues = true; + } + } else if (elem instanceof String) { + if (((String) elem).length() > 0) { + multipleValues = true; + } + } else if (elem instanceof Short) { + if (((Short) elem) > 0) { + multipleValues = true; + } + } else { + log.warn("Unchecked array type in TPS parsing " + elem.getClass()); + } + position++; + } + if (multipleValues) { + log.debug("Line number " + currentLine + " in file " + file.getName() + ", field number " + fieldPosition + " contains array with multiple values"); + } + } else { + value = ""; + } + + } + writer.write(removeIllegalCharacters(value), true); + fieldPosition++; + } + } + catch (IOException ioe) { + log.info("Unable to create row data while parsing TPS file", ioe); + } + catch (Exception e) { + log.info("parse error."); + } + + writer.close(); + rows.add(out.toString()); + } + currentLine++; + } + } + + return rows; + } + + protected String removeIllegalCharacters(String formattedData) { + StringBuilder buff = new StringBuilder(formattedData.length()); + for (char c : formattedData.toCharArray()) { + if (c >= 0 && c < 31) { + if (c == '\n' || c == '\t' || c == '\r') { + buff.append(c); + } + } else { + if (c != 127) { + buff.append(c); + } + } + + } + return buff.toString(); + } + + protected String encode(byte[] byteData) { + StringBuilder sb = new StringBuilder(); + for (byte b : byteData) { + int i = b & 0xff; + if (i >= 0 && i <= 15) { + sb.append("\\X0").append(Integer.toString(i, 16)); + } else if ((i >= 16 && i <= 31) || i == 127) { + sb.append("\\X").append(Integer.toString(i, 16)); + } else { + sb.append(Character.toChars(i)); + } + } + return sb.toString(); + } + @Override + public String getColumnNames() { + StringBuffer columns = new StringBuffer(); + try { + for (int i = 0; i < fields.size(); i++) { + if (i > 0) { columns.append(","); } + columns.append(fields.get(i)); + } + } + catch (Exception e) { + log.error("Unable to read column names for TPS file ", e); + } + return columns.toString(); + } + + @Override + public Map getTableNames(String tableName, File file) throws IOException { + tpsFile = new TpsFile(file); + Map tableNames = new HashMap(); + int tableNumber = 0; + for (TableNameRecord tableNameRecord : ((TpsFile) tpsFile).getTableNameRecords()) { + String tableStr = tableNameRecord.toString(); + String parsedName = tableStr.substring(tableStr.indexOf("(") + 1, tableStr.indexOf(",")); + if (!parsedName.startsWith("UNNAMED")) { + tableNames.put(tableNameRecord.getTableNumber(), tableName + "_" + parsedName); + } + tableNumber = tableNameRecord.getTableNumber(); + } + if (tableNames.size() == 0) { + tableNames.put(tableNumber, tableName); + } + return tableNames; + } +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 88982b22c9..9922ab33c3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -49,9 +49,9 @@ import org.jumpmind.symmetric.common.ContextConstants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.data.DataEventType; -import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.load.DefaultReloadGenerator; import org.jumpmind.symmetric.load.IReloadGenerator; +import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.DataGap; @@ -92,6 +92,7 @@ import org.jumpmind.symmetric.route.NonTransactionalBatchAlgorithm; import org.jumpmind.symmetric.route.SimpleRouterContext; import org.jumpmind.symmetric.route.SubSelectDataRouter; +import org.jumpmind.symmetric.route.TPSRouter; import org.jumpmind.symmetric.route.TransactionalBatchAlgorithm; import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IConfigurationService; @@ -148,6 +149,7 @@ public RouterService(ISymmetricEngine engine) { engine.getSymmetricDialect())); extensionService.addExtensionPoint(FileSyncDataRouter.ROUTER_TYPE, new FileSyncDataRouter(engine)); extensionService.addExtensionPoint("dbf", new DBFRouter(engine)); + extensionService.addExtensionPoint("tps", new TPSRouter(engine)); extensionService.addExtensionPoint("csv", new CSVRouter(engine)); extensionService.addExtensionPoint(DefaultReloadGenerator.NAME, new DefaultReloadGenerator(engine));