Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed Aug 19, 2019
2 parents e86d7b1 + f47bb27 commit c250645
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 49 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -221,7 +221,7 @@ subprojects { subproject ->
voltDbVersion = '8.4.1'
bouncyCastleVersion = '1.59'
animalSnifferVersion = '1.10'
jnaVersion = '4.2.0'
jnaVersion = '4.5.0'
jettyVersion = project.property('jetty.version')
nuodbVersion = '3.3.1'
tiberoVersion = '6'
Expand Down
Expand Up @@ -19,4 +19,4 @@
-- under the License.
--

insert into test_db_import_1 (id,string_value,string_required_value,char_value,char_required_value,date_value,time_value,boolean_value,integer_value,decimal_value,double_value) values (5,'a\b\\c\\\d','junk','j','j',{d '1989-09-21'},{ts '1997-11-06 08:02:33.324'},0,1,1,1);
insert into test_db_import_1 (id,string_value,string_required_value,char_value,char_required_value,date_value,time_value,boolean_value,integer_value,decimal_value,double_value) values (5,'a\b\\c\\\d','junk','j','j',{d '1989-09-21'},{ts '1997-11-06 08:02:33.323'},0,1,1,1);
Expand Up @@ -2973,26 +2973,26 @@ public Data mapRow(Row row) {
String csvRow = null;
if (selectedAsCsv) {
csvRow = row.stringValue();
int commaCount = StringUtils.countMatches(csvRow, ",");
if (commaCount < expectedCommaCount) {
throw new SymmetricException(
"The extracted row data did not have the expected (%d) number of columns (actual=%s): %s. The initial load sql was: %s",
expectedCommaCount, commaCount, csvRow, initialLoadSql);
}
} else if (objectValuesWillNeedEscaped) {
csvRow = platform.getCsvStringValue(
symmetricDialect.getBinaryEncoding(), sourceTable.getColumns(),
row, isColumnPositionUsingTemplate);
} else {
csvRow = row.csvValue();
}
int commaCount = StringUtils.countMatches(csvRow, ",");
if (expectedCommaCount <= commaCount) {
Data data = new Data(0, null, csvRow, DataEventType.INSERT, triggerHistory
.getSourceTableName(), null, triggerHistory, batch.getChannelId(),
null, null);
data.putAttribute(Data.ATTRIBUTE_ROUTER_ID, triggerRouter.getRouter()
.getRouterId());
return data;
} else {
throw new SymmetricException(
"The extracted row data did not have the expected (%d) number of columns (actual=%s): %s. The initial load sql was: %s",
expectedCommaCount, commaCount, csvRow, initialLoadSql);
}

Data data = new Data(0, null, csvRow, DataEventType.INSERT, triggerHistory
.getSourceTableName(), null, triggerHistory, batch.getChannelId(),
null, null);
data.putAttribute(Data.ATTRIBUTE_ROUTER_ID, triggerRouter.getRouter()
.getRouterId());
return data;
}
});
}
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.jumpmind.db.platform.AbstractDatabasePlatform;
import org.jumpmind.db.platform.mssql.MsSql2008DatabasePlatform;
import org.jumpmind.symmetric.TestConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.csv.CsvWriter;
Expand Down Expand Up @@ -307,7 +308,7 @@ public void test05SkippingResentBatch() throws Exception {
getNextBatchId();
for (long i = 0; i < 7; i++) {
batchId--;
testSimple(CsvConstants.INSERT, values, values);
testSimple(CsvConstants.INSERT, values, massageExpectectedResultsForDialect(values));
assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
IncomingBatch.Status.OK, "Wrong status");
IncomingBatch batch = getIncomingBatchService().findIncomingBatch(batchId,
Expand All @@ -324,14 +325,27 @@ public void test05SkippingResentBatch() throws Exception {
Thread.sleep(10);
}
}

private String[] massageExpectectedResultsForDialect(String[] values) {
if(values[5] != null && getSymmetricEngine().getDatabasePlatform() instanceof MsSql2008DatabasePlatform) {
// No time portion for a date field
values[5] = values[5].replaceFirst(" \\d\\d:\\d\\d:\\d\\d\\.000", "");
}
if(values[6] != null && getSymmetricEngine().getDatabasePlatform() instanceof MsSql2008DatabasePlatform) {
if(values[6].length() == 23) {
values[6] = values[6] + "0000";
}
}
return values;
}

@Test
public void test06ErrorWhileSkip() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] values = { getNextId(), "string2", "string not null2", "char2", "char not null2",
"2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "0.474" };

testSimple(CsvConstants.INSERT, values, values);
testSimple(CsvConstants.INSERT, values, massageExpectectedResultsForDialect(values));
assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
IncomingBatch.Status.OK, "Wrong status");
IncomingBatch batch = getIncomingBatchService().findIncomingBatch(batchId,
Expand Down Expand Up @@ -473,7 +487,7 @@ public void test09ErrorThenSuccessBatch() throws Exception {
values[1] = "A smaller string that will succeed";
values[5] = "2007-01-02 00:00:00.000";
values[9] = "67.89";
testSimple(CsvConstants.INSERT, values, values);
testSimple(CsvConstants.INSERT, values, massageExpectectedResultsForDialect(values));
assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
IncomingBatch.Status.OK, "Wrong status. " + printDatabase());
IncomingBatch batch = getIncomingBatchService().findIncomingBatch(batchId,
Expand Down Expand Up @@ -517,7 +531,7 @@ public void test10MultipleBatch() throws Exception {

writer.close();
load(out);
assertTestTableEquals(values[0], values);
assertTestTableEquals(values[0], massageExpectectedResultsForDialect(values));
assertTestTableEquals(values2[0], null);

assertEquals(
Expand Down
3 changes: 2 additions & 1 deletion symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
Expand Up @@ -265,8 +265,9 @@ protected void checkForColumn(String columnName) {
}

final private java.util.Date getDate(String value, String[] pattern) {
int spaceIndex = value.lastIndexOf(" ");
int fractionIndex = value.lastIndexOf(".");
if (fractionIndex > 0 && value.substring(fractionIndex, value.length()).length() > 3) {
if (spaceIndex > 0 && fractionIndex > 0 && value.substring(fractionIndex, value.length()).length() > 3) {
return Timestamp.valueOf(value);
} else {
return FormatUtils.parseDate(value, pattern);
Expand Down
Expand Up @@ -511,6 +511,7 @@ public void testColumnNotExisting() throws Exception {
String[] values = { getNextId(), "testColumnNotExisting", "string not null", "char",
"i do not exist!", "char not null", "2007-01-02 00:00:00.000",
"2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747" };
massageExpectectedResultsForDialect2(values);
List<String> valuesAsList = new ArrayList<String>(Arrays.asList(values));
valuesAsList.remove(4);
String[] expectedValues = valuesAsList.toArray(new String[valuesAsList.size()]);
Expand Down Expand Up @@ -626,16 +627,27 @@ public void testBenchmark() throws Exception {
private String[] massageExpectectedResultsForDialect(String[] values) {
RoundingMode mode = RoundingMode.DOWN;

if (values[5] != null
if(values[5] != null && platform instanceof MsSql2008DatabasePlatform) {
// No time portion for a date field
values[5] = values[5].replaceFirst(" \\d\\d:\\d\\d:\\d\\d\\.000", "");
} else if (values[5] != null
&& (!(platform instanceof OracleDatabasePlatform
|| platform instanceof TiberoDatabasePlatform
|| platform instanceof MsSql2000DatabasePlatform
|| platform instanceof MsSql2005DatabasePlatform
|| platform instanceof MsSql2008DatabasePlatform
||
// Only SqlServer 2000 and 2005 should not be mangled. 2008 now uses Date and Time data types.
(
(platform instanceof MsSql2000DatabasePlatform || platform instanceof MsSql2005DatabasePlatform
) && ! (platform instanceof MsSql2008DatabasePlatform)
)
|| platform instanceof AseDatabasePlatform
|| platform instanceof SqlAnywhereDatabasePlatform))) {
values[5] = values[5].replaceFirst(" \\d\\d:\\d\\d:\\d\\d\\.?0?", " 00:00:00.0");
}
if(values[6] != null && platform instanceof MsSql2008DatabasePlatform) {
if(values[6].length() == 23) {
values[6] = values[6] + "0000";
}
}
if (values[10] != null) {
values[10] = values[10].replace(',', '.');
}
Expand All @@ -657,4 +669,16 @@ private String[] massageExpectectedResultsForDialect(String[] values) {
return values;
}

private String[] massageExpectectedResultsForDialect2(String[] values) {
if(values[6] != null && platform instanceof MsSql2008DatabasePlatform) {
// No time portion for a date field
values[6] = values[6].replaceFirst(" \\d\\d:\\d\\d:\\d\\d\\.000", "");
}
if(values[7] != null && platform instanceof MsSql2008DatabasePlatform) {
if(values[7].length() == 23) {
values[7] = values[7] + "0000";
}
}
return values;
}
}
Expand Up @@ -50,8 +50,16 @@ public class UnixService extends WrapperService {

private static final String INITD_DIR = "/etc/init.d";

private static final String SYSTEMD_INSTALL_DIR = "/lib/systemd/system";
private static final String SYSTEMD_RUNTIME_DIR = "/run/systemd/system";

private static final String INITD_SCRIPT_START = "start";
private static final String INITD_SCRIPT_STOP = "stop";

private static final String SYSTEMD_SCRIPT_START = "start";
private static final String SYSTEMD_SCRIPT_STOP = "stop";
private static final String SYSTEMD_SCRIPT_ENABLE = "enable";
private static final String SYSTEMD_SCRIPT_DISABLE = "disable";

@Override
protected boolean setWorkingDirectory(String dir) {
Expand All @@ -60,13 +68,57 @@ protected boolean setWorkingDirectory(String dir) {

@Override
public void install() {
String rcDir = getRunCommandDir();
String runFile = INITD_DIR + "/" + config.getName();

if (!isPrivileged()) {
throw new WrapperException(Constants.RC_MUST_BE_ROOT, 0, "Must be root to install");
}

System.out.println("Installing " + config.getName() + " ...");

if(isSystemdRunning()) {
installSystemd();
} else {
installInitd();
}
System.out.println("Done");
}

private boolean isSystemdRunning() {
File systemddir = new File(SYSTEMD_RUNTIME_DIR);
return systemddir.exists();
}

private void installSystemd() {
String runFile = SYSTEMD_INSTALL_DIR + "/" + config.getName() + ".service";
try(FileWriter writer = new FileWriter(runFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(getClass().getResourceAsStream(
"/symmetricds.systemd"))))
{
String line = null;
while((line = reader.readLine()) != null) {
line = line.replaceAll("\\$\\{wrapper.description}", config.getDescription());
line = line.replaceAll("\\$\\{wrapper.pidfile}", getWrapperPidFile());
line = line.replaceAll("\\$\\{wrapper.home}", config.getWorkingDirectory().getAbsolutePath());
line = line.replaceAll("\\$\\{wrapper.jarfile}", config.getWrapperJarPath());
line = line.replaceAll("\\$\\{wrapper.java.command}", config.getJavaCommand());
line = line.replaceAll("\\$\\{wrapper.run.as.user}",
config.getRunAsUser() == null || config.getRunAsUser().length() == 0 ? "root" : config.getRunAsUser());
writer.write(line + "\n");
}
} catch(IOException e) {
throw new WrapperException(Constants.RC_FAIL_INSTALL, 0, "Failed while writing run file", e);
}
runServiceCommand(getSystemdCommand(SYSTEMD_SCRIPT_ENABLE, config.getName()));
}

private String getWrapperPidFile() throws IOException {
// Make location absolute (starting with / )
return (config.getWrapperPidFile() != null && config.getWrapperPidFile().startsWith("/") ? config.getWrapperPidFile() :
config.getWorkingDirectory().getCanonicalPath() + "/" + config.getWrapperPidFile());
}

private void installInitd() {
String rcDir = getRunCommandDir();
String runFile = INITD_DIR + "/" + config.getName();

try {
FileWriter writer = new FileWriter(runFile);
Expand Down Expand Up @@ -99,28 +151,42 @@ public void install() {
CLibrary.INSTANCE.symlink(runFile, rcDir + "/rc" + runLevel + ".d/K" + RUN_SEQUENCE_STOP
+ config.getName());
}
System.out.println("Done");
}

@Override
public void uninstall() {
String rcDir = getRunCommandDir();
String runFile = INITD_DIR + "/" + config.getName();

if (!isPrivileged()) {
throw new WrapperException(Constants.RC_MUST_BE_ROOT, 0, "Must be root to uninstall");
}

System.out.println("Uninstalling " + config.getName() + " ...");

if(isSystemdRunning()) {
uninstallSystemd();
} else {
uninstallInitd();
}

for (String runLevel : RUN_LEVELS_START) {
System.out.println("Done");
}

private void uninstallSystemd() {
runServiceCommand(getSystemdCommand(SYSTEMD_SCRIPT_DISABLE, config.getName()));
String runFile = SYSTEMD_INSTALL_DIR + "/" + config.getName() + ".service";
new File(runFile).delete();
}

private void uninstallInitd() {
String rcDir = getRunCommandDir();
String runFile = INITD_DIR + "/" + config.getName();
for (String runLevel : RUN_LEVELS_START) {
new File(rcDir + "/rc" + runLevel + ".d/S" + RUN_SEQUENCE_START + config.getName()).delete();
}
for (String runLevel : RUN_LEVELS_STOP) {
new File(rcDir + "/rc" + runLevel + ".d/K" + RUN_SEQUENCE_STOP + config.getName()).delete();
}
new File(runFile).delete();
System.out.println("Done");
}

protected String getRunCommandDir() {
Expand All @@ -142,7 +208,11 @@ public boolean isPrivileged() {

@Override
public boolean isInstalled() {
return new File(INITD_DIR + "/" + config.getName()).exists();
if(isSystemdRunning()) {
return new File(SYSTEMD_INSTALL_DIR + "/" + config.getName() + ".service").exists();
} else {
return new File(INITD_DIR + "/" + config.getName()).exists();
}
}

@Override
Expand Down Expand Up @@ -223,6 +293,14 @@ private ArrayList<String> getServiceCommand(String command) {
return s;
}

private ArrayList<String> getSystemdCommand(String command, String serviceName) {
ArrayList<String> s = new ArrayList<String>();
s.add("systemctl");
s.add(command);
s.add(serviceName);
return s;
}

@Override
public void start() {
if(isInstalled()) {
Expand All @@ -236,14 +314,26 @@ public void start() {
stopProcesses(true);
System.out.println("Waiting for server to start");

boolean success = true;
if(shouldRunService()) {
success = runServiceCommand(getServiceCommand(INITD_SCRIPT_START));
if(isSystemdRunning()) {
boolean success = true;
if(shouldRunService()) {
success = runServiceCommand(getSystemdCommand(SYSTEMD_SCRIPT_START, config.getName()));
} else {
super.start();
}
if (! success) {
throw new WrapperException(Constants.RC_FAIL_EXECUTION, 0, "Server did not start");
}
} else {
super.start();
}
if (! success) {
throw new WrapperException(Constants.RC_FAIL_EXECUTION, 0, "Server did not start");
boolean success = true;
if(shouldRunService()) {
success = runServiceCommand(getServiceCommand(INITD_SCRIPT_START));
} else {
super.start();
}
if (! success) {
throw new WrapperException(Constants.RC_FAIL_EXECUTION, 0, "Server did not start");
}
}
} else {
super.start();
Expand Down Expand Up @@ -334,11 +424,19 @@ protected void stopProcesses(boolean isStopAbandoned) {
}
}

if(shouldRunService()) {
runServiceCommand(getServiceCommand(INITD_SCRIPT_STOP));
} else {
super.stopProcesses(isStopAbandoned);
}
if(isSystemdRunning()) {
if(shouldRunService()) {
runServiceCommand(getSystemdCommand(SYSTEMD_SCRIPT_STOP, config.getName()));
} else {
super.stopProcesses(isStopAbandoned);
}
} else {
if(shouldRunService()) {
runServiceCommand(getServiceCommand(INITD_SCRIPT_STOP));
} else {
super.stopProcesses(isStopAbandoned);
}
}
} else {
super.stopProcesses(isStopAbandoned);
}
Expand Down

0 comments on commit c250645

Please sign in to comment.