Skip to content

Commit

Permalink
Merge branch '3.7' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.7
  • Loading branch information
Hicks, Josh committed Dec 3, 2015
2 parents 512558e + 5150257 commit e6981a6
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 46 deletions.
2 changes: 1 addition & 1 deletion symmetric-client-clib/inc/model/Node.h
Expand Up @@ -23,7 +23,7 @@

#include <stdlib.h>

#define SYM_VERSION "3.7.23"
#define SYM_VERSION "3.7.27"

typedef enum SymNodeStatus {
SYM_NODE_STATUS_DATA_LOAD_NOT_STARTED,
Expand Down
Expand Up @@ -161,7 +161,7 @@ int SymDefaultDatabaseWriter_delete(SymDefaultDatabaseWriter *this, SymCsvData *
this->dmlStatement->destroy(this->dmlStatement);
}
// TODO: pass nullKeyIndiciators
this->dmlStatement = SymDmlStatement_new(NULL, SYM_DML_TYPE_UPDATE, this->targetTable, NULL, &this->platform->databaseInfo);
this->dmlStatement = SymDmlStatement_new(NULL, SYM_DML_TYPE_DELETE, this->targetTable, NULL, &this->platform->databaseInfo);
this->sqlTransaction->prepare(this->sqlTransaction, this->dmlStatement->sql, &error);
this->isError = error != 0;
}
Expand Down
Expand Up @@ -36,8 +36,8 @@ public OracleTriggerTemplate(ISymmetricDialect symmetricDialect) {
geometryColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then to_clob('') else '\"'||replace(replace(SDO_UTIL.TO_WKTGEOMETRY($(tableAlias).\"$(columnName)\"),'\\','\\\\'),'\"','\\\"')||'\"' end";
numberColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"')" ;
datetimeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF3')),'\"'))" ;
dateTimeWithTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')),'\"'))" ;
dateTimeWithLocalTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF')),'\"'))" ;
dateTimeWithTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM')),'\"'))" ;
dateTimeWithLocalTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ;
timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
clobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ;
Expand Down
Expand Up @@ -216,7 +216,7 @@ protected void flush() {
String sql = String.format("BULK INSERT " +
this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator) +
" FROM '" + filename) + "'" +
" WITH ( FIELDTERMINATOR='"+StringEscapeUtils.escapeJava(fieldTerminator)+"', KEEPIDENTITY" +
" WITH (DATAFILETYPE='widechar', FIELDTERMINATOR='"+StringEscapeUtils.escapeJava(fieldTerminator)+"', KEEPIDENTITY" +
(fireTriggers ? ", FIRE_TRIGGERS" : "") + rowTerminatorString +");";
Statement stmt = c.createStatement();

Expand All @@ -242,4 +242,4 @@ protected void createStagingFile() {
table.getName() + this.getBatch().getBatchId() + ".csv");
}

}
}
Expand Up @@ -22,20 +22,22 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

import oracle.jdbc.OracleTypes;
import oracle.jdbc.internal.OracleCallableStatement;
import oracle.sql.ARRAY;
import oracle.sql.ArrayDescriptor;
import java.util.TimeZone;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.AbstractDatabasePlatform;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.BulkSqlException;
import org.jumpmind.db.sql.JdbcSqlTransaction;
Expand All @@ -46,6 +48,14 @@
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

import oracle.jdbc.OracleTypes;
import oracle.jdbc.internal.OracleCallableStatement;
import oracle.sql.ARRAY;
import oracle.sql.ArrayDescriptor;
import oracle.sql.Datum;
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;

public class OracleBulkDatabaseWriter extends DefaultDatabaseWriter {

protected String procedurePrefix;
Expand Down Expand Up @@ -82,7 +92,7 @@ public void write(CsvData data) {
if (lastEventType != null && !lastEventType.equals(dataEventType)) {
flush();
}

lastEventType = dataEventType;

boolean requiresFlush = false;
Expand All @@ -93,6 +103,9 @@ public void write(CsvData data) {
if (filterBefore(data)) {
Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA),
targetTable.getColumns(), false, writerSettings.isFitToColumn());

rowData = convertObjectValues(rowData, targetTable.getColumns());

for (int i = 0; i < rowData.length; i++) {

List<Object> columnList = null;
Expand Down Expand Up @@ -128,10 +141,120 @@ public void write(CsvData data) {
if (requiresFlush) {
flush();
}

checkForEarlyCommit();
}

/**
* @param rowData
* @param columns
* @return
*/
protected Object[] convertObjectValues(Object[] values, Column[] columns) {
if (values != null) {
for (int i = 0; i < values.length; i++) {
Object value = values[i];
Column column = columns.length > i ? columns[i] : null;
if (column != null) {
values[i] = convertObjectValue(value, column);
}
}
}

return values;
}

/**
* @param value
* @param column
* @return
*/
protected Object convertObjectValue(Object value, Column column) {
if (value == null) {
return null;
}

if (column.getMappedTypeCode() == OracleTypes.TIMESTAMPTZ
|| column.getMappedTypeCode() == OracleTypes.TIMESTAMPLTZ) {
String stringValue = (String)value;
return parseTimestampTZ(column.getMappedTypeCode(), stringValue);
}

return value;
}

protected Datum parseTimestampTZ(int type, String value) {
if (value == null || StringUtils.isEmpty(value.trim())) {
return null;
}

try {
Timestamp timestamp = null;
TimeZone timezone = null;
try {
// Try something like: 2015-11-20 13:37:44.000000000
timestamp = Timestamp.valueOf(value.trim());
timezone = TimeZone.getDefault();
}
catch (Exception ex) {
log.debug("Failed to convert value to timestamp.", ex);
// Now expecting something like: 2015-11-20 13:37:44.000000000 +08:00
int split = value.lastIndexOf(" ");
String timestampString = value.substring(0, split).trim();
if (timestampString.endsWith(".")) { // Cover case where triggers would export format like "2007-01-02 03:20:10."
timestampString = timestampString.substring(0, timestampString.length()-1);
}
String timezoneString = value.substring(split).trim();

timestamp = Timestamp.valueOf(timestampString);

timezone = ((AbstractDatabasePlatform)platform).getTimeZone(timezoneString);
// Even though we provide the timezone to the Oracle driver, apparently
// the timestamp component needs to actually be in UTC.
if (type == OracleTypes.TIMESTAMPTZ) {
timestamp = toUTC(timestamp, timezone);
}
}
Calendar timezoneCalender = Calendar.getInstance();
timezoneCalender.setTimeZone(timezone);
timezoneCalender.setTime(timestamp);

JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction;
Connection c = jdbcTransaction.getConnection();

Connection oracleConnection = jdbcExtractor.getNativeConnection(c);
Datum ts = null;
if (type == OracleTypes.TIMESTAMPTZ) {
ts = new TIMESTAMPTZ(oracleConnection, timestamp, timezoneCalender);
} else {
ts = new TIMESTAMPLTZ(oracleConnection, timestamp);
}
return ts;
} catch (Exception ex) {
log.info("Failed to convert '" + value + "' to TIMESTAMPTZ." );
throw platform.getSqlTemplate().translate(ex);
}
}

public Timestamp toUTC(Timestamp timestamp, TimeZone timezone) {
int nanos = timestamp.getNanos();
timestamp.setNanos(0);

SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
dateFormat.setTimeZone(timezone);
Date date;
try {
date = dateFormat.parse(timestamp.toString());
} catch (ParseException ex) {
log.info("Failed to parse '" + timestamp + "'");
throw platform.getSqlTemplate().translate(ex);
}

Timestamp utcTimestamp = new Timestamp(date.getTime());
utcTimestamp.setNanos(nanos);
return utcTimestamp;
}

protected void flush() {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
try {
Expand Down Expand Up @@ -191,7 +314,7 @@ protected void flush() {
failureMessage.append(". The last flushed line number of the batch was ");
failureMessage.append(statistics.get(batch).get(DataWriterStatisticConstants.LINENUMBER));
failureMessage.append("\n");

for (List<Object> row : rowArrays) {
failureMessage.append(StringUtils.abbreviate(Arrays.toString(row.toArray(new Object[row.size()])), CsvData.MAX_DATA_SIZE_TO_PRINT_TO_LOG));
failureMessage.append("\n");
Expand Down Expand Up @@ -234,12 +357,12 @@ protected String getMappedType(int typeCode) {
case Types.LONGVARCHAR:
case Types.LONGNVARCHAR:
return "varchar(4000)";
case Types.DATE:
case Types.TIME:
case OracleTypes.TIMESTAMPTZ:
return "timestamp with time zone";
return "timestamp(9) with time zone";
case OracleTypes.TIMESTAMPLTZ:
return "timestamp with local time zone";
return "timestamp(9) with local time zone";
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
return "timestamp";
case Types.NUMERIC:
Expand Down Expand Up @@ -268,10 +391,12 @@ protected String getTypeName(int typeCode) {
case Types.LONGVARCHAR:
case Types.LONGNVARCHAR:
return String.format("%s_%s_t", procedurePrefix, "varchar").toUpperCase();
case Types.DATE:
case Types.TIME:
case OracleTypes.TIMESTAMPTZ:
return String.format("%s_%s_t", procedurePrefix, "timestamptz").toUpperCase();
case OracleTypes.TIMESTAMPLTZ:
return String.format("%s_%s_t", procedurePrefix, "timestampltz").toUpperCase();
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
return String.format("%s_%s_t", procedurePrefix, "timestamp").toUpperCase();
case Types.NUMERIC:
Expand All @@ -284,7 +409,7 @@ protected String getTypeName(int typeCode) {
case Types.INTEGER:
return String.format("%s_%s_t", procedurePrefix, "integer").toUpperCase();
default:
throw new UnsupportedOperationException(Integer.toString(typeCode));
throw new UnsupportedOperationException("OracleBulkDatabaseWriter does not support type: " + Integer.toString(typeCode));

}
}
Expand Down Expand Up @@ -350,7 +475,7 @@ protected void buildBulkInsertProcedure(Table table) {
ddl.append(String.format("%s(i), \n", variable));
}
ddl.replace(ddl.length()-3, ddl.length(), ");\n");

ddl.append(String.format("exception \n"));
ddl.append(String.format(" when dml_errors then \n"));
ddl.append(String.format(" for i in 1 .. SQL%%BULK_EXCEPTIONS.count loop \n"));
Expand All @@ -361,7 +486,7 @@ protected void buildBulkInsertProcedure(Table table) {
ddl.append(String.format(" o_errors(o_errors.count) := SQL%%BULK_EXCEPTIONS(i).ERROR_INDEX; \n"));
ddl.append(String.format(" end loop; \n"));
ddl.append(String.format("end %s; ", procedureName));

if (log.isDebugEnabled()) {
log.debug(ddl.toString());
}
Expand Down

0 comments on commit e6981a6

Please sign in to comment.