Skip to content

Commit

Permalink
0003581: Cassandra support as a load only node
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jun 5, 2018
1 parent d190861 commit 2f6bf1c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
Expand Up @@ -140,6 +140,7 @@ protected int getMappedTypeCode(String dataType) {
* VARINT(14), TIMEUUID(15), CUSTOM(0), UDT(48,
* ProtocolVersion.V3), TUPLE(49, ProtocolVersion.V3),
*/

if (dataType == DataType.Name.INT.name()) {
return Types.INTEGER;
} else if (dataType == DataType.Name.BIGINT.name()) {
Expand All @@ -162,9 +163,10 @@ protected int getMappedTypeCode(String dataType) {
return Types.DATE;
} else if (dataType == DataType.Name.TIME.name()) {
return Types.TIME;
} else if (dataType == DataType.Name.VARCHAR.name() || dataType == DataType.Name.TEXT.name()
|| dataType == DataType.Name.UUID.name()) {
} else if (dataType == DataType.Name.VARCHAR.name() || dataType == DataType.Name.TEXT.name()) {
return Types.VARCHAR;
} else if (dataType == DataType.Name.UUID.name()) {
return Types.JAVA_OBJECT;
} else if (dataType == DataType.Name.LIST.name()) {
return Types.STRUCT;
} else if (dataType == DataType.Name.SET.name()) {
Expand Down
Expand Up @@ -2,10 +2,16 @@

import java.math.BigDecimal;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
Expand All @@ -17,6 +23,7 @@

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -30,6 +37,10 @@ public class CassandraDatabaseWriter extends DynamicDefaultDatabaseWriter {

PreparedStatement pstmt;

SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
DateTimeFormatter timeFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

public CassandraDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform targetPlatform,String prefix,
IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings) {
Expand All @@ -50,16 +61,31 @@ protected void prepare() {

@Override
protected void prepare(String sql, CsvData data) {
if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "") && !data.getDataEventType().equals(DataEventType.SQL)) {
if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "") && !isUserSendSql(sql, data)) {
super.prepare(sql, data);
} else {
pstmt = session.prepare(sql);
}
}

/*
* Checks if a send sql event type was for the sym_node table. If it is the send sql shoudl run against Cassandra tables otherwise it is an internal Symmetric
* send sql.
*/
protected boolean isUserSendSql(String sql, CsvData data) {
return data.getDataEventType().equals(DataEventType.SQL)
&& this.targetTable.getNameLowerCase().equals(this.getTablePrefix().toLowerCase() + "_node")
&& !sql.toLowerCase().contains("from " + this.getTablePrefix().toLowerCase() + "_node");
}

@Override
public int prepareAndExecute(String sql) {
return session.execute(sql).wasApplied() ? 1 : 0;
public int prepareAndExecute(String sql, CsvData data) {
if (isUserSendSql(sql, data)) {
return session.execute(sql).wasApplied() ? 1 : 0;
}
else {
return super.prepareAndExecute(sql, data);
}
}

@Override
Expand Down Expand Up @@ -118,6 +144,26 @@ protected void bindVariables(BoundStatement bstmt, Column[] columns, int[] types
bstmt.setInt(i, Integer.parseInt(values[i]));
} else if (Types.VARCHAR == type) {
bstmt.setString(i, values[i]);
} else if (Types.JAVA_OBJECT == type) {
bstmt.setUUID(i, UUID.fromString(values[i]));
} else if (Types.TIMESTAMP == type) {
try {
bstmt.setTimestamp(i, tsFormat.parse(values[i]));
} catch (ParseException e) {
throw new RuntimeException("Unable to bind timestamp column " + columns[i].getName() + " with value " + values[i]);
}
} else if (Types.DATE == type) {
try {
bstmt.setDate(i, LocalDate.fromMillisSinceEpoch(dateFormat.parse(values[i]).getTime()));
} catch (ParseException e) {
throw new RuntimeException("Unable to bind date column " + columns[i].getName() + " with value " + values[i]);
}
} else if (Types.TIME == type) {
try {
bstmt.setTime(i, LocalTime.parse(values[i], timeFormat).toNanoOfDay());
} catch (DateTimeParseException e) {
throw new RuntimeException("Unable to bind time column " + columns[i].getName() + " with value " + values[i]);
}
} else if (Types.BOOLEAN == type) {
bstmt.setBool(i, Boolean.parseBoolean(values[i]));
} else if (Types.DECIMAL == type) {
Expand Down
Expand Up @@ -569,7 +569,7 @@ protected boolean sql(CsvData data) {
if (log.isDebugEnabled()) {
log.debug("About to run: {}", sql);
}
count += prepareAndExecute(sql);
count += prepareAndExecute(sql, data);
if (log.isDebugEnabled()) {
log.debug("{} rows updated when running: {}", count, sql);
}
Expand Down Expand Up @@ -943,7 +943,7 @@ public DatabaseWriterSettings getWriterSettings() {
return writerSettings;
}

public int prepareAndExecute(String sql) {
public int prepareAndExecute(String sql, CsvData data) {
return getTransaction().prepareAndExecute(sql);
}

Expand Down

0 comments on commit 2f6bf1c

Please sign in to comment.