Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.8' into 3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 31, 2017
2 parents d67dfdf + cf772e2 commit 2c8d28e
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 12 deletions.
Expand Up @@ -28,13 +28,11 @@
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileTrigger;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.FileTriggerRouter;
Expand Down
Expand Up @@ -31,15 +31,16 @@ public List<String> parse(File file, int lineNumber) {
List<String> rows = new ArrayList<String>();

InputStream fileInputStream = null;
int currentLine = 1;
try {
boolean validateHeader = engine.getParameterService()
.is(ParameterConstants.DBF_ROUTER_VALIDATE_HEADER, true);

fileInputStream = Files.newInputStream(file.toPath(), StandardOpenOption.READ);
dbfReader = new DBFReader(fileInputStream, validateHeader);
int currentLine = 1;

while (dbfReader.hasNextRecord()) {
StringBuffer row = new StringBuffer();
StringBuilder row = new StringBuilder();
Object[] record = dbfReader.nextRecord();
if (currentLine > lineNumber) {
for (int i = 0; i < record.length; i++) {
Expand All @@ -53,7 +54,7 @@ public List<String> parse(File file, int lineNumber) {
}
}
catch (Exception e) {
log.error("Unable to parse DBF file " + file.getName(), e);
log.error("Unable to parse DBF file " + file.getName() + " line number " + currentLine, e);
}
finally {
if (fileInputStream != null) {
Expand Down
Expand Up @@ -3,13 +3,19 @@
import java.io.*;
import java.nio.charset.Charset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DBFReader {

static final Logger log = LoggerFactory.getLogger(DBFReader.class);

private DataInputStream stream;
private DBFField fields[];
private byte nextRecord[];
private int nFieldCount;
private boolean validate;
private int lineNumber;

public DBFReader(String s, boolean validate) throws DBFException {
stream = null;
Expand Down Expand Up @@ -153,13 +159,19 @@ public boolean hasNextRecord() {
public Object[] nextRecord() throws DBFException {
if (!hasNextRecord())
throw new DBFException("No more records available.");
lineNumber++;
Object aobj[] = new Object[nFieldCount];
int i = 1;
for (int j = 0; j < aobj.length; j++) {
int k = fields[j].getLength();
StringBuffer stringbuffer = new StringBuffer(k);
stringbuffer.append(new String(nextRecord, i, k));
aobj[j] = fields[j].parse(stringbuffer.toString());
try {
aobj[j] = fields[j].parse(stringbuffer.toString());
} catch (DBFException e) {
log.error("Failed to parse field " + (j+1) + " on line " + lineNumber + " with that had a value of " + stringbuffer);
throw e;
}
i += fields[j].getLength();
}

Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringEscapeUtils;
Expand All @@ -54,6 +55,7 @@
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
Expand Down Expand Up @@ -534,11 +536,12 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa

}

@SuppressWarnings("unchecked")
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests) {
if (reloadRequests == null) {
return null;
}
Map<String, TableReloadRequest> reloadMap = new HashMap<String, TableReloadRequest>();
Map<String, TableReloadRequest> reloadMap = new CaseInsensitiveMap();
for (TableReloadRequest item : reloadRequests) {
reloadMap.put(item.getIdentifier(), item);
}
Expand Down Expand Up @@ -639,7 +642,12 @@ private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord,
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());
for (TriggerRouter triggerRouter : triggerRouters) {
TableReloadRequest currentRequest = reloadRequests.get(triggerRouter.getTriggerId() + triggerRouter.getRouterId());
String key = triggerRouter.getTriggerId() + triggerRouter.getRouterId();
TableReloadRequest currentRequest = reloadRequests.get(key);
if (currentRequest == null) {
throw new SymmetricException("Could not locate table reload request for key '" + key +
"'. Available requests are: " + reloadRequests.keySet());
}
beforeSql = currentRequest.getBeforeCustomSql();

if (isNotBlank(beforeSql)) {
Expand Down
Expand Up @@ -329,15 +329,17 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo) {
else {
NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(load.getTargetNodeId());

boolean registered = targetNodeSecurity.getRegistrationTime() != null
|| targetNodeSecurity.getNodeId().equals(targetNodeSecurity.getCreatedAtNodeId());
boolean registered = targetNodeSecurity != null && (targetNodeSecurity.getRegistrationTime() != null
|| targetNodeSecurity.getNodeId().equals(targetNodeSecurity.getCreatedAtNodeId()));
if (registered) {
// Make loads unique to the target and create time
String key = load.getTargetNodeId() + "::" + load.getCreateTime().toString();
if (!requestsSplitByLoad.containsKey(key)) {
requestsSplitByLoad.put(key, new ArrayList<TableReloadRequest>());
}
requestsSplitByLoad.get(key).add(load);
} else {
log.warn("There was a load queued up for '{}', but the node is not registered. It is being ignored", load.getTargetNodeId());
}
}
}
Expand Down

0 comments on commit 2c8d28e

Please sign in to comment.