Skip to content

Commit

Permalink
Merge branch '3.11' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.11
  • Loading branch information
erilong committed Jan 17, 2020
2 parents 550a104 + b3d5282 commit f264f10
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 12 deletions.
Expand Up @@ -168,4 +168,16 @@ public long insertWithGeneratedKey(String sql, String column, String sequenceNam
return sqlTemplate.insertWithGeneratedKey(database, sql, column, sequenceName, args, null);
}

@Override
public boolean isAllowInsertIntoAutoIncrement() {
// TODO Auto-generated method stub
return false;
}

@Override
public void clearBatch() {
// TODO Auto-generated method stub

}

}
Expand Up @@ -43,6 +43,8 @@ public void start(Batch batch) {
super.start(batch);
if (isFallBackToDefault()) {
getTransaction().setInBatchMode(false);
getTransaction().clearBatch();

log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using DEFAULT loader");
} else {
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using BULK loader");
Expand Down
@@ -1,6 +1,8 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.common.ContextConstants;
Expand All @@ -10,6 +12,8 @@

public class JdbcBatchBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

private DmlStatement previousDmlStatement;

public JdbcBatchBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
String tablePrefix, DatabaseWriterSettings writerSettings) {
super(symmetricPlatform, targetPlatform, tablePrefix, writerSettings);
Expand Down Expand Up @@ -47,11 +51,11 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC
protected void bulkWrite(CsvData data) {
writeDefault(data);
}

@Override
public void end(Batch batch, boolean inError) {
super.end(batch, inError);
protected void prepare() {
getTransaction().flush();
super.prepare();
}

}

Expand Up @@ -33,6 +33,7 @@
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.properties.TypedProperties;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
Expand All @@ -55,23 +56,32 @@ public MailService(IParameterService parameterService, ISymmetricDialect symmetr
super(parameterService, symmetricDialect);
}

public String sendEmail(String subject, String text, String recipients) {
return sendEmail(subject, text, recipients, getJavaMailProperties(),
public String sendEmail(String subject, String text, String toRecipients) {
return sendEmail(subject, text, toRecipients, null, null);
}

public String sendEmail(String subject, String text, String toRecipients, String ccRecipients, String bccRecipients) {
return sendEmail(subject, text, toRecipients, ccRecipients, bccRecipients, getJavaMailProperties(),
parameterService.getString(ParameterConstants.SMTP_TRANSPORT, "smtp"),
parameterService.is(ParameterConstants.SMTP_USE_AUTH, false),
parameterService.getString(ParameterConstants.SMTP_USER),
parameterService.getString(ParameterConstants.SMTP_PASSWORD));
}

public String sendEmail(String subject, String text, String recipients, TypedProperties prop) {
return sendEmail(subject, text, recipients, getJavaMailProperties(prop),
public String sendEmail(String subject, String text, String toRecipients, TypedProperties prop) {
return sendEmail(subject, text, toRecipients, null, null, prop);
}

public String sendEmail(String subject, String text, String toRecipients, String ccRecipients, String bccRecipients, TypedProperties prop) {
return sendEmail(subject, text, toRecipients, ccRecipients, bccRecipients, getJavaMailProperties(prop),
prop.get(ParameterConstants.SMTP_TRANSPORT, "smtp"),
prop.is(ParameterConstants.SMTP_USE_AUTH, false),
prop.get(ParameterConstants.SMTP_USER), prop.get(ParameterConstants.SMTP_PASSWORD));
}

protected String sendEmail(String subject, String text, String recipients, Properties prop,
String transportType, boolean useAuth, String user, String password) {
protected String sendEmail(String subject, String text, String toRecipients, String ccRecipients, String bccRecipients,
Properties prop, String transportType, boolean useAuth, String user, String password)
{
Session session = Session.getInstance(prop);
ByteArrayOutputStream ba = null;
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -102,7 +112,15 @@ protected String sendEmail(String subject, String text, String recipients, Prope
try {
MimeMessage message = new MimeMessage(session);
message.setSentDate(new Date());
message.setRecipients(RecipientType.TO, recipients);
if(StringUtils.isNotEmpty(toRecipients)) {
message.setRecipients(RecipientType.TO, toRecipients);
}
if(StringUtils.isNotEmpty(ccRecipients)) {
message.setRecipients(RecipientType.CC, ccRecipients);
}
if(StringUtils.isNotEmpty(bccRecipients)) {
message.setRecipients(RecipientType.BCC, bccRecipients);
}
message.setSubject(subject);
message.setText(text);
message.setFrom(new InternetAddress(prop.getProperty(JAVAMAIL_FROM)));
Expand Down
Expand Up @@ -144,4 +144,16 @@ public long insertWithGeneratedKey(String sql, String column, String sequenceNam
return 0;
}

@Override
public boolean isAllowInsertIntoAutoIncrement() {
// TODO Auto-generated method stub
return false;
}

@Override
public void clearBatch() {
// TODO Auto-generated method stub

}

}
Expand Up @@ -75,8 +75,12 @@ public interface ISqlTransaction {
* have been marked as auto increment. This is specific to SQL Server.
*/
public void allowInsertIntoAutoIncrementColumns(boolean value, Table table, String quote, String catalogSeparator, String schemaSeparator);


public boolean isAllowInsertIntoAutoIncrement();

public long insertWithGeneratedKey(String sql, String column, String sequenceName,
Object[] args, int[] types);

public void clearBatch();

}
Expand Up @@ -143,12 +143,19 @@ public boolean start(Table table) {
@Override
public void end(Table table) {
super.end(table);
if (this.transaction.isAllowInsertIntoAutoIncrement()) {
// SQL Server using JDBC Batch loading requires a flush before turning off the identity insert.
this.transaction.flush();
}
allowInsertIntoAutoIncrementColumns(false, this.targetTable);
}

@Override
public void end(Batch batch, boolean inError) {
this.currentDmlStatement = null;
if (inError) {
allowInsertIntoAutoIncrementColumns(false, targetTable);
}
super.end(batch, inError);
}

Expand Down
Expand Up @@ -26,6 +26,8 @@

public class MsSqlJdbcSqlTransaction extends JdbcSqlTransaction {

private boolean allowsAutoIncrement;

public MsSqlJdbcSqlTransaction(JdbcSqlTemplate sqltemplate) {
super(sqltemplate);
}
Expand All @@ -36,11 +38,19 @@ public void allowInsertIntoAutoIncrementColumns(boolean allow, Table table, Stri
if (allow) {
execute(String.format("SET IDENTITY_INSERT %s ON",
table.getQualifiedTableName(quote, catalogSeparator, schemaSeparator)));
this.allowsAutoIncrement = true;
} else {
execute(String.format("SET IDENTITY_INSERT %s OFF",
table.getQualifiedTableName(quote, catalogSeparator, schemaSeparator)));
this.allowsAutoIncrement = false;
}
}
}

@Override
public boolean isAllowInsertIntoAutoIncrement() {
return this.allowsAutoIncrement;
}


}
Expand Up @@ -539,5 +539,19 @@ public LogSqlBuilder getLogSqlBuilder() {
public void setLogSqlBuilder(LogSqlBuilder logSqlBuilder) {
this.logSqlBuilder = logSqlBuilder;
}

public void clearBatch() {
if (this.inBatchMode && pstmt != null) {
try {
pstmt.clearBatch();
} catch (SQLException e) {
log.warn("Unable to clear batch mode for transaction. ", e);
}
}
}

@Override
public boolean isAllowInsertIntoAutoIncrement() {
return false;
}
}

0 comments on commit f264f10

Please sign in to comment.