Skip to content

Commit

Permalink
0003573: TopSpeed (TPS) Router
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed May 23, 2018
1 parent 5ffcafb commit 291b261
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 46 deletions.
3 changes: 2 additions & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -233,7 +233,8 @@ subprojects { subproject ->
// javax.resource needed by jaybird
provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0"
provided "com.datastax.cassandra:cassandra-driver-core:3.1.4"

provided "com.datastax.cassandra:cassandra-driver-core:3.1.4"
provided "nl.cad:tps-parse:1.0.15-SNAPSHOT"

testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar')
testCompile "junit:junit:$junitVersion"
Expand Down
@@ -1,6 +1,7 @@
package org.jumpmind.symmetric.route;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -27,9 +28,11 @@
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IContextService;

import nl.cad.tpsparse.tps.NotATopSpeedFileException;

public abstract class AbstractFileParsingRouter extends AbstractDataRouter {

public abstract List<String> parse(File file, int lineNumber);
public abstract List<String> parse(File file, int lineNumber, int tableId);
public abstract String getColumnNames();

public abstract ISymmetricEngine getEngine();
Expand Down Expand Up @@ -79,56 +82,85 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
}

if (triggerId != null) {
String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir();
File file = createSourceFile(baseDir, relativeDir, fileName);


Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath));

List<String> dataRows = parse(file, lineNumber);
String columnNames = getColumnNames();

String nodeList = buildNodeList(nodes);
String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY)
.append("=")
.append(triggerId)
.append(",")
.append(EXTERNAL_DATA_ROUTER_KEY)
.append("=")
.append(dataMetaData.getRouter().getRouterId())
.append(",")
.append(EXTERNAL_DATA_FILE_DATA_ID)
.append("=")
.append(dataMetaData.getData().getDataId()).toString();

for (String row : dataRows) {
Data data = new Data();
try {
String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir();
File file = createSourceFile(baseDir, relativeDir, fileName);

data.setChannelId(channelId);
data.setDataEventType(DataEventType.INSERT);
data.setRowData(row);
data.setTableName(targetTableName);
data.setNodeList(nodeList);
data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames));
data.setExternalData(externalData);
data.setDataId(getEngine().getDataService().insertData(data));
lineNumber++;
}
if (!dataRows.isEmpty()) {
try {
contextService.save(filePath, lineNumber.toString());
deleteFileIfNecessary(dataMetaData);
}
catch (Exception e) {
e.printStackTrace();
String nodeList = buildNodeList(nodes);
String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY)
.append("=")
.append(triggerId)
.append(",")
.append(EXTERNAL_DATA_ROUTER_KEY)
.append("=")
.append(dataMetaData.getRouter().getRouterId())
.append(",")
.append(EXTERNAL_DATA_FILE_DATA_ID)
.append("=")
.append(dataMetaData.getData().getDataId()).toString();


Map<Integer, String> tableNames = getTableNames(getTargetTableName(targetTableName, fileName), file);
int tableIndex=0;
for (Map.Entry<Integer, String> tableEntry : tableNames.entrySet()) {
String contextId = filePath + "[" + tableEntry.getValue() + "]";
Integer lineNumber = contextService.getString(contextId) == null ? 0 : new Integer(contextService.getString(contextId));

List<String> dataRows = parse(file, lineNumber, tableEntry.getKey());
String columnNames = getColumnNames();

for (String row : dataRows) {
Data data = new Data();

data.setChannelId(channelId);
data.setDataEventType(DataEventType.INSERT);
data.setRowData(row);
data.setTableName(tableEntry.getValue());
data.setNodeList(nodeList);
data.setTriggerHistory(getTriggerHistory(tableEntry.getValue(), columnNames));
data.setExternalData(externalData);
data.setDataId(getEngine().getDataService().insertData(data));
lineNumber++;
}
if (!dataRows.isEmpty()) {
try {
contextService.save(contextId, lineNumber.toString());
if ((tableNames.size() - 1) == tableIndex) {
deleteFileIfNecessary(dataMetaData);
}
}
catch (Exception e) {
e.printStackTrace();
}
}
log.info("Finished parsing file[table] " + fileName + "[" + tableEntry.getValue() + "]");
tableIndex++;
}
} catch (IOException ioe) {
log.error("Unable to load file", ioe);
} catch (NotATopSpeedFileException ntsf) {
log.error("The file " + fileName + " is not a valid TopSpeed file.", ntsf);
}

}
}
return new HashSet<String>();

}

public Map<Integer, String> getTableNames(String tableName, File file) throws IOException {
Map<Integer, String> tableNames = new HashMap<Integer, String>();
tableNames.put(1, (String) tableName);
return tableNames;
}

public String getTargetTableName(String targetTableName, String fileName) {
if (targetTableName == null) {
targetTableName = fileName.substring(0, fileName.indexOf("."));
}
return targetTableName;
}

public String buildNodeList(Set<Node> nodes) {
StringBuffer sb = new StringBuffer();
for (Node n : nodes) {
Expand Down
Expand Up @@ -24,7 +24,7 @@ public ISymmetricEngine getEngine() {
}

@Override
public List<String> parse(File file, int lineNumber) {
public List<String> parse(File file, int lineNumber, int tableIndex) {
List<String> rows = new ArrayList<String>();
int currentLine = 1;

Expand Down
Expand Up @@ -27,7 +27,7 @@ public ISymmetricEngine getEngine() {
}

@Override
public List<String> parse(File file, int lineNumber) {
public List<String> parse(File file, int lineNumber, int tableIndex) {
List<String> rows = new ArrayList<String>();

InputStream fileInputStream = null;
Expand Down
@@ -0,0 +1,182 @@
package org.jumpmind.symmetric.route;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.csv.CsvWriter;

import nl.cad.tpsparse.tps.TpsFile;
import nl.cad.tpsparse.tps.record.DataRecord;
import nl.cad.tpsparse.tps.record.FieldDefinitionRecord;
import nl.cad.tpsparse.tps.record.TableDefinitionRecord;
import nl.cad.tpsparse.tps.record.TableNameRecord;

public class TPSRouter extends AbstractFileParsingRouter implements IDataRouter, IBuiltInExtensionPoint {

private ISymmetricEngine engine;

private List<String> fields = new ArrayList<String>();

private TpsFile tpsFile;

public TPSRouter(ISymmetricEngine engine) {
this.engine = engine;
}

@Override
public ISymmetricEngine getEngine() {
return this.engine;
}

@Override
public List<String> parse(File file, int lineNumber, int tableId) {
List<String> rows = new ArrayList<String>();
TableDefinitionRecord table = tpsFile.getTableDefinitions(false).get(tableId);
fields.clear();

if (table != null && table.getFields() != null) {
for (FieldDefinitionRecord field : table.getFields()) {
fields.add(field.getFieldNameNoTable());
}

int currentLine = 1;
for (DataRecord rec : tpsFile.getDataRecords(tableId, table, false)) {
if (currentLine > lineNumber) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
writer.setTextQualifier('\"');
writer.setUseTextQualifier(true);
writer.setForceQualifier(true);

try {
int fieldPosition = 1;
for (Object val : rec.getValues()) {
String value = val.toString();
if (val instanceof Object[]) {
if (((Object[]) val).length > 0) {
int position = 0;
boolean multipleValues = false;
for (Object elem : (Object[]) val) {
// Take only first value in array for now
if (position == 0) {
value = elem.toString();
} else if (elem instanceof Integer) {
if (((Integer) elem) > 0) {
multipleValues = true;
}
} else if (elem instanceof String) {
if (((String) elem).length() > 0) {
multipleValues = true;
}
} else if (elem instanceof Short) {
if (((Short) elem) > 0) {
multipleValues = true;
}
} else {
log.warn("Unchecked array type in TPS parsing " + elem.getClass());
}
position++;
}
if (multipleValues) {
log.debug("Line number " + currentLine + " in file " + file.getName() + ", field number " + fieldPosition + " contains array with multiple values");
}
} else {
value = "";
}

}
writer.write(removeIllegalCharacters(value), true);
fieldPosition++;
}
}
catch (IOException ioe) {
log.info("Unable to create row data while parsing TPS file", ioe);
}
catch (Exception e) {
log.info("parse error.");
}

writer.close();
rows.add(out.toString());
}
currentLine++;
}
}

return rows;
}

protected String removeIllegalCharacters(String formattedData) {
StringBuilder buff = new StringBuilder(formattedData.length());
for (char c : formattedData.toCharArray()) {
if (c >= 0 && c < 31) {
if (c == '\n' || c == '\t' || c == '\r') {
buff.append(c);
}
} else {
if (c != 127) {
buff.append(c);
}
}

}
return buff.toString();
}

protected String encode(byte[] byteData) {
StringBuilder sb = new StringBuilder();
for (byte b : byteData) {
int i = b & 0xff;
if (i >= 0 && i <= 15) {
sb.append("\\X0").append(Integer.toString(i, 16));
} else if ((i >= 16 && i <= 31) || i == 127) {
sb.append("\\X").append(Integer.toString(i, 16));
} else {
sb.append(Character.toChars(i));
}
}
return sb.toString();
}
@Override
public String getColumnNames() {
StringBuffer columns = new StringBuffer();
try {
for (int i = 0; i < fields.size(); i++) {
if (i > 0) { columns.append(","); }
columns.append(fields.get(i));
}
}
catch (Exception e) {
log.error("Unable to read column names for TPS file ", e);
}
return columns.toString();
}

@Override
public Map<Integer, String> getTableNames(String tableName, File file) throws IOException {
tpsFile = new TpsFile(file);
Map<Integer,String> tableNames = new HashMap<Integer, String>();
int tableNumber = 0;
for (TableNameRecord tableNameRecord : ((TpsFile) tpsFile).getTableNameRecords()) {
String tableStr = tableNameRecord.toString();
String parsedName = tableStr.substring(tableStr.indexOf("(") + 1, tableStr.indexOf(","));
if (!parsedName.startsWith("UNNAMED")) {
tableNames.put(tableNameRecord.getTableNumber(), tableName + "_" + parsedName);
}
tableNumber = tableNameRecord.getTableNumber();
}
if (tableNames.size() == 0) {
tableNames.put(tableNumber, tableName);
}
return tableNames;
}
}
Expand Up @@ -49,9 +49,9 @@
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.load.DefaultReloadGenerator;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataGap;
Expand Down Expand Up @@ -92,6 +92,7 @@
import org.jumpmind.symmetric.route.NonTransactionalBatchAlgorithm;
import org.jumpmind.symmetric.route.SimpleRouterContext;
import org.jumpmind.symmetric.route.SubSelectDataRouter;
import org.jumpmind.symmetric.route.TPSRouter;
import org.jumpmind.symmetric.route.TransactionalBatchAlgorithm;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IConfigurationService;
Expand Down Expand Up @@ -148,6 +149,7 @@ public RouterService(ISymmetricEngine engine) {
engine.getSymmetricDialect()));
extensionService.addExtensionPoint(FileSyncDataRouter.ROUTER_TYPE, new FileSyncDataRouter(engine));
extensionService.addExtensionPoint("dbf", new DBFRouter(engine));
extensionService.addExtensionPoint("tps", new TPSRouter(engine));

extensionService.addExtensionPoint("csv", new CSVRouter(engine));
extensionService.addExtensionPoint(DefaultReloadGenerator.NAME, new DefaultReloadGenerator(engine));
Expand Down

0 comments on commit 291b261

Please sign in to comment.