diff --git a/symmetric-assemble/common.gradle b/symmetric-assemble/common.gradle index aee43b7b4a..ef2a73026d 100644 --- a/symmetric-assemble/common.gradle +++ b/symmetric-assemble/common.gradle @@ -233,7 +233,6 @@ subprojects { subproject -> // javax.resource needed by jaybird provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0" provided "com.datastax.cassandra:cassandra-driver-core:3.1.4" - provided "com.datastax.cassandra:cassandra-driver-core:3.1.4" provided "nl.cad:tps-parse:1.0.15-SNAPSHOT" testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar') diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java index 1cc59115d7..2d075e3adf 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java @@ -45,6 +45,7 @@ import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.platform.JdbcDatabasePlatformFactory; +import org.jumpmind.db.platform.cassandra.CassandraPlatform; import org.jumpmind.db.platform.generic.GenericJdbcDatabasePlatform; import org.jumpmind.db.sql.JdbcSqlTemplate; import org.jumpmind.db.sql.LogSqlBuilder; @@ -311,6 +312,12 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring DataSource dataSource, boolean waitOnAvailableDatabase, boolean isLoadOnly) { log.info("Initializing connection to database"); if (dataSource == null) { + if (isLoadOnly) { + String dbUrl = properties.get(BasicDataSourcePropertyConstants.DB_POOL_URL); + if (dbUrl != null && dbUrl.startsWith("cassandra://")) { + return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12)); + } + } String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME); if (StringUtils.isNotBlank(jndiName)) { try { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java index a9f632c43c..ce5ec1334f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java @@ -31,13 +31,14 @@ import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.IDataWriter; +import org.jumpmind.symmetric.io.data.writer.CassandraDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.Conflict; import org.jumpmind.symmetric.io.data.writer.Conflict.PingBack; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; import org.jumpmind.symmetric.io.data.writer.DefaultTransformWriterConflictResolver; +import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; -import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.ResolvedData; import org.jumpmind.symmetric.io.data.writer.TransformWriter; import org.jumpmind.symmetric.service.IParameterService; @@ -66,6 +67,27 @@ public IDataWriter getDataWriter(final String sourceNodeId, TransformWriter transformWriter, List filters, List errorHandlers, List conflictSettings, List resolvedData) { + + if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("CassandraPlatform")) { + try { + + // TODO Evalute if ConflictResolver will work for Cassandra and if so remove duplicate code. + return new CassandraDatabaseWriter(symmetricDialect.getPlatform(), + symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(), + new DefaultTransformWriterConflictResolver(transformWriter), + buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, + resolvedData)); + + } catch (Exception e) { + log.warn( + "Failed to create the cassandra database writer. Check to see if all of the required jars have been added"); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } + } DynamicDefaultDatabaseWriter writer = new DynamicDefaultDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(), new DefaultTransformWriterConflictResolver(transformWriter) { diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java index de3ea4bd32..d2110c3593 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java @@ -54,5 +54,6 @@ private DatabaseNamesConstants() { public final static String TIBERO = "tibero"; public final static String RAIMA = "raima"; public final static String TERADATA = "teradata"; + public final static String CASSANDRA = "cassandra"; } diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDMLStatement.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDMLStatement.java new file mode 100644 index 0000000000..58e997ad15 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDMLStatement.java @@ -0,0 +1,16 @@ +package org.jumpmind.db.platform.cassandra; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.platform.DatabaseInfo; +import org.jumpmind.db.sql.DmlStatement; + +public class CassandraDMLStatement extends DmlStatement { + + public CassandraDMLStatement(DmlType type, String catalogName, String schemaName, String tableName, + Column[] keysColumns, Column[] columns, boolean[] nullKeyValues, DatabaseInfo databaseInfo, + boolean useQuotedIdentifiers, String textColumnExpression) { + super(type, catalogName, schemaName, tableName, keysColumns, columns, nullKeyValues, databaseInfo, useQuotedIdentifiers, + textColumnExpression); + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlBuilder.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlBuilder.java new file mode 100644 index 0000000000..8a0f3f0b96 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlBuilder.java @@ -0,0 +1,13 @@ +package org.jumpmind.db.platform.cassandra; + +import org.jumpmind.db.platform.AbstractDdlBuilder; +import org.jumpmind.db.platform.DatabaseNamesConstants; + +public class CassandraDdlBuilder extends AbstractDdlBuilder { + + public CassandraDdlBuilder() { + super(DatabaseNamesConstants.CASSANDRA); + } + + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlReader.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlReader.java new file mode 100644 index 0000000000..17f7503a4f --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlReader.java @@ -0,0 +1,74 @@ +package org.jumpmind.db.platform.cassandra; + +import java.util.List; +import java.util.Map; + +import org.jumpmind.db.model.Database; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.model.Trigger; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.platform.IDdlReader; + +public class CassandraDdlReader implements IDdlReader { + protected CassandraPlatform platform; + + public CassandraDdlReader(IDatabasePlatform platform) { + this.platform = (CassandraPlatform) platform; + } + + @Override + public Database readTables(String catalog, String schema, String[] tableTypes) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Table readTable(String catalog, String schema, String tableName) { + Map tables = platform.getMetaData() + .get(catalog == null ? schema : catalog); + return tables.get(tableName.toLowerCase()); + } + + @Override + public List getTableTypes() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getCatalogNames() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getSchemaNames(String catalog) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getTableNames(String catalog, String schema, String[] tableTypes) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getColumnNames(String catalog, String schema, String tableName) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getTriggers(String catalog, String schema, String tableName) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Trigger getTriggerFor(Table table, String name) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraPlatform.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraPlatform.java new file mode 100644 index 0000000000..6e00dbf672 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraPlatform.java @@ -0,0 +1,179 @@ +package org.jumpmind.db.platform.cassandra; + +import java.sql.Types; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.AbstractDatabasePlatform; +import org.jumpmind.db.platform.IDdlBuilder; +import org.jumpmind.db.platform.IDdlReader; +import org.jumpmind.db.sql.ISqlTemplate; +import org.jumpmind.db.sql.SqlTemplateSettings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; + +public class CassandraPlatform extends AbstractDatabasePlatform { + + Map> metaData = new HashMap>(); + + protected Session session; + + protected Cluster cluster; + + public CassandraPlatform(SqlTemplateSettings settings, String contactPoint) { + super(settings); + + cluster = Cluster.builder().addContactPoint(contactPoint).build(); + this.session = cluster.connect(); + + buildMetaData(); + } + + @Override + public String getName() { + return "cassandra"; + } + + @Override + public String getDefaultSchema() { + return null; + } + + @Override + public String getDefaultCatalog() { + return null; + } + + @Override + public T getDataSource() { + return null; + } + + @Override + public boolean isLob(int type) { + return false; + } + + @Override + public IDdlBuilder getDdlBuilder() { + return new CassandraDdlBuilder(); + } + + @Override + public IDdlReader getDdlReader() { + return new CassandraDdlReader(this); + } + + @Override + public ISqlTemplate getSqlTemplate() { + return new CassandraSqlTemplate(); + } + + @Override + public ISqlTemplate getSqlTemplateDirty() { + return new CassandraSqlTemplate(); + } + + public Session getSession() { + return session; + } + + public void setSession(Session session) { + this.session = session; + } + + + public Map> getMetaData() { + return metaData; + } + + public void setMetaData(Map> metaData) { + this.metaData = metaData; + } + + protected void buildMetaData() { + for (KeyspaceMetadata keystoreMeta : cluster.getMetadata().getKeyspaces()) { + metaData.put(keystoreMeta.getName(), new HashMap()); + for (TableMetadata tableMeta : keystoreMeta.getTables()) { + Table table = new Table(); + table.setName(tableMeta.getName()); + table.setSchema(keystoreMeta.getName()); + List pkColumns = tableMeta.getPrimaryKey(); + + for (ColumnMetadata columnMeta : tableMeta.getColumns()) { + Column column = new Column(); + column.setName(columnMeta.getName()); + column.setMappedTypeCode(getMappedTypeCode(columnMeta.getType().getName().name())); + if (columnMeta.getType().getTypeArguments() != null) { + StringBuffer types = new StringBuffer(); + for (DataType dt : columnMeta.getType().getTypeArguments()) { + if (types.length() > 0) { + types.append(","); + } + types.append(dt.getName().name()); + column.setDescription(types.toString()); + } + } + for (ColumnMetadata pkMeta : pkColumns) { + if (pkMeta.equals(columnMeta)) { + column.setPrimaryKey(true); + } + } + table.addColumn(column); + } + metaData.get(keystoreMeta.getName()).put(table.getName(), table); + } + } + } + + protected int getMappedTypeCode(String dataType) { + /* + * Unsupported Types ================= ASCII(1), BLOB(3), COUNTER(5), INET(16), + * VARINT(14), TIMEUUID(15), CUSTOM(0), UDT(48, + * ProtocolVersion.V3), TUPLE(49, ProtocolVersion.V3), + */ + if (dataType == DataType.Name.INT.name()) { + return Types.INTEGER; + } else if (dataType == DataType.Name.BIGINT.name()) { + return Types.BIGINT; + } else if (dataType == DataType.Name.SMALLINT.name()) { + return Types.SMALLINT; + } else if (dataType == DataType.Name.TINYINT.name()) { + return Types.TINYINT; + } else if (dataType == DataType.Name.BOOLEAN.name()) { + return Types.BOOLEAN; + } else if (dataType == DataType.Name.DECIMAL.name()) { + return Types.DECIMAL; + } else if (dataType == DataType.Name.DOUBLE.name()) { + return Types.DOUBLE; + } else if (dataType == DataType.Name.FLOAT.name()) { + return Types.FLOAT; + } else if (dataType == DataType.Name.TIMESTAMP.name()) { + return Types.TIMESTAMP; + } else if (dataType == DataType.Name.DATE.name()) { + return Types.DATE; + } else if (dataType == DataType.Name.TIME.name()) { + return Types.TIME; + } else if (dataType == DataType.Name.VARCHAR.name() || dataType == DataType.Name.TEXT.name() + || dataType == DataType.Name.UUID.name()) { + return Types.VARCHAR; + } else if (dataType == DataType.Name.LIST.name()) { + return Types.STRUCT; + } else if (dataType == DataType.Name.SET.name()) { + return Types.REF; + } else if (dataType == DataType.Name.MAP.name()) { + return Types.OTHER; + } + return Types.VARCHAR; + + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java new file mode 100644 index 0000000000..e91f3c1af1 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java @@ -0,0 +1,190 @@ +package org.jumpmind.db.platform.cassandra; + +import java.util.Map; +import java.util.Set; + +import org.jumpmind.db.sql.AbstractSqlTemplate; +import org.jumpmind.db.sql.ISqlReadCursor; +import org.jumpmind.db.sql.ISqlResultsListener; +import org.jumpmind.db.sql.ISqlRowMapper; +import org.jumpmind.db.sql.ISqlStatementSource; +import org.jumpmind.db.sql.ISqlTransaction; + +public class CassandraSqlTemplate extends AbstractSqlTemplate { + + @Override + public byte[] queryForBlob(String sql, Object... args) { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] queryForBlob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) { + // TODO Auto-generated method stub + return null; + } + + @Override + public String queryForClob(String sql, Object... args) { + // TODO Auto-generated method stub + return null; + } + + @Override + public String queryForClob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) { + // TODO Auto-generated method stub + return null; + } + + @Override + public T queryForObject(String sql, Class clazz, Object... params) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map queryForMap(String sql, Object... params) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ISqlReadCursor queryForCursor(String sql, ISqlRowMapper mapper, Object[] params, int[] types) { + // TODO Auto-generated method stub + return null; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, int commitRate, ISqlResultsListener listener, + String... sql) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, boolean failOnDrops, boolean failOnSequenceCreate, + int commitRate, ISqlResultsListener listener, ISqlStatementSource source) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, int commitRate, String... sql) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int update(String sql, Object[] values, int[] types) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void testConnection() { + // TODO Auto-generated method stub + + } + + @Override + public boolean isUniqueKeyViolation(Throwable ex) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isForeignKeyViolation(Throwable ex) { + // TODO Auto-generated method stub + return false; + } + + @Override + public ISqlTransaction startSqlTransaction() { + return new CassandraSqlTransaction(); + } + + @Override + public ISqlTransaction startSqlTransaction(boolean autoCommit) { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getDatabaseMajorVersion() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getDatabaseMinorVersion() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String getDatabaseProductName() { + return "cassandra"; + } + + @Override + public String getDatabaseProductVersion() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getDriverName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getDriverVersion() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Set getSqlKeywords() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean supportsGetGeneratedKeys() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isStoresUpperCaseIdentifiers() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isStoresLowerCaseIdentifiers() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isStoresMixedCaseQuotedIdentifiers() { + // TODO Auto-generated method stub + return false; + } + + @Override + public long insertWithGeneratedKey(String sql, String column, String sequenceName, Object[] args, int[] types) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean isDataTruncationViolation(Throwable ex) { + // TODO Auto-generated method stub + return false; + } + + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTransaction.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTransaction.java new file mode 100644 index 0000000000..6465caec29 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTransaction.java @@ -0,0 +1,147 @@ +package org.jumpmind.db.platform.cassandra; + +import java.util.List; +import java.util.Map; + +import org.jumpmind.db.model.Table; +import org.jumpmind.db.sql.ISqlRowMapper; +import org.jumpmind.db.sql.ISqlTransaction; +import org.jumpmind.db.sql.ISqlTransactionListener; +import org.jumpmind.db.sql.Row; + +public class CassandraSqlTransaction implements ISqlTransaction { + + @Override + public void addSqlTransactionListener(ISqlTransactionListener listener) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isInBatchMode() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setInBatchMode(boolean batchMode) { + // TODO Auto-generated method stub + + } + + @Override + public T queryForObject(String sql, Class clazz, Object... args) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Row queryForRow(String sql, Object... args) { + // TODO Auto-generated method stub + return null; + } + + @Override + public int queryForInt(String sql, Object... args) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long queryForLong(String sql, Object... args) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int execute(String sql) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int prepareAndExecute(String sql, Object[] args, int[] types) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int prepareAndExecute(String sql, Object... args) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int prepareAndExecute(String sql, Map args) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public List query(String sql, ISqlRowMapper mapper, Map namedParams) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List query(String sql, ISqlRowMapper mapper, Object[] args, int[] types) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void commit() { + // TODO Auto-generated method stub + + } + + @Override + public void rollback() { + // TODO Auto-generated method stub + + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public void prepare(String sql) { + // TODO Auto-generated method stub + + } + + @Override + public int addRow(T marker, Object[] values, int[] types) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int flush() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public List getUnflushedMarkers(boolean clear) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void allowInsertIntoAutoIncrementColumns(boolean value, Table table, String quote, String catalogSeparator, + String schemaSeparator) { + // TODO Auto-generated method stub + + } + + @Override + public long insertWithGeneratedKey(String sql, String column, String sequenceName, Object[] args, int[] types) { + // TODO Auto-generated method stub + return 0; + } + +} diff --git a/symmetric-io/build.gradle b/symmetric-io/build.gradle index f4d6522e7d..fd626573a1 100644 --- a/symmetric-io/build.gradle +++ b/symmetric-io/build.gradle @@ -7,6 +7,9 @@ apply from: symAssembleDir + '/common.gradle' compile project(":symmetric-db") compile "org.apache-extras.beanshell:bsh:$bshVersion" compile "net.sourceforge.jeval:jeval:0.9.4" + provided "com.fasterxml.jackson.core:jackson-core:2.9.5" + provided "com.fasterxml.jackson.core:jackson-databind:2.9.5" + testCompile project(path: ':symmetric-util', configuration: 'testArtifacts') testCompile project(":symmetric-jdbc") testCompile project(path: ':symmetric-jdbc', configuration: 'testArtifacts') diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/CassandraDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/CassandraDatabaseWriter.java new file mode 100644 index 0000000000..2b3f4d4b2c --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/CassandraDatabaseWriter.java @@ -0,0 +1,215 @@ +package org.jumpmind.symmetric.io.data.writer; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.model.TypeMap; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.platform.cassandra.CassandraPlatform; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataEventType; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class CassandraDatabaseWriter extends DynamicDefaultDatabaseWriter { + + protected Session session; + + Map> metaData = new HashMap>(); + + PreparedStatement pstmt; + + public CassandraDatabaseWriter(IDatabasePlatform symmetricPlatform, + IDatabasePlatform targetPlatform,String prefix, + IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings) { + + super(symmetricPlatform, targetPlatform, prefix, conflictResolver, settings); + this.metaData = ((CassandraPlatform) targetPlatform).getMetaData(); + this.session = ((CassandraPlatform) targetPlatform).getSession(); + } + + @Override + protected void prepare() { + if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "")) { + super.prepare(); + } else { + pstmt = session.prepare(currentDmlStatement.getSql()); + } + } + + @Override + protected void prepare(String sql, CsvData data) { + if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "") && !data.getDataEventType().equals(DataEventType.SQL)) { + super.prepare(sql, data); + } else { + pstmt = session.prepare(sql); + } + } + + @Override + public int prepareAndExecute(String sql) { + return session.execute(sql).wasApplied() ? 1 : 0; + } + + @Override + protected int execute(CsvData data, String[] values) { + if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "")) { + return super.execute(data, values); + } + BoundStatement bstmt = pstmt.bind(); + currentDmlValues = getPlatform().getObjectValues(batch.getBinaryEncoding(), values, + currentDmlStatement.getMetaData(), false, writerSettings.isFitToColumn()); + if (log.isDebugEnabled()) { + log.debug("Submitting data [{}] with types [{}]", + dmlValuesToString(currentDmlValues, this.currentDmlStatement.getTypes()), + TypeMap.getJdbcTypeDescriptions(this.currentDmlStatement.getTypes())); + } + + bindVariables(bstmt, this.currentDmlStatement.getColumns(), this.currentDmlStatement.getTypes(), values); + return session.execute(bstmt).wasApplied() ? 1 : 0; + } + + @Override + protected Table lookupTableAtTarget(Table sourceTable) { + if (sourceTable != null && isSymmetricTable(sourceTable.getName())) { + return super.lookupTableAtTarget(sourceTable); + } + String keyspace = sourceTable.getCatalog() == null ? sourceTable.getSchema() : sourceTable.getCatalog(); + Map tables = metaData.get(keyspace); + Table returnTable = tables == null ? sourceTable : tables.get(sourceTable.getName()); + + // ADD target table param is missing do not error + if (returnTable == null) { + throw new RuntimeException("Unable to find Cassandra target table " + sourceTable.getName() + " in keyspace " + keyspace); + } + return returnTable; + } + + @Override + protected boolean create(CsvData data) { + return false; + } + + @Override + protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDetails) { + } + + @Override + protected void allowInsertIntoAutoIncrementColumns(boolean value, Table table) { + } + + protected void bindVariables(BoundStatement bstmt, Column[] columns, int[] types, String[] values) { + // TODO data time mappings + + int i = 0; + for (int type : types) { + if (Types.INTEGER == type) { + bstmt.setInt(i, Integer.parseInt(values[i])); + } else if (Types.VARCHAR == type) { + bstmt.setString(i, values[i]); + } else if (Types.BOOLEAN == type) { + bstmt.setBool(i, Boolean.parseBoolean(values[i])); + } else if (Types.DECIMAL == type) { + bstmt.setDecimal(i, new BigDecimal(values[i])); + } else if (Types.DOUBLE == type) { + bstmt.setDouble(i, Double.parseDouble(values[i])); + } else if (Types.FLOAT == type) { + bstmt.setFloat(i, Float.parseFloat(values[i])); + } else if (Types.STRUCT == type) { + bstmt.setList(i, parseList(columns[i], values[i])); + } else if (Types.REF == type) { + bstmt.setSet(i, parseSet(columns[i], values[i])); + } else if (Types.OTHER == type) { + bstmt.setMap(i, parseMap(columns[i], values[i])); + } + + i++; + } + } + + protected List parseList(Column c, String val) { + try { + if (c.getDescription() != null) { + if (c.getDescription().toLowerCase().equals("text") || c.getDescription().toLowerCase().equals("varchar")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("int")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("bigint")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("smallint")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("tinyint")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("double")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("decimal")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("float")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } + } + return new ObjectMapper().readValue(val, new TypeReference>(){}); + + } catch (Exception e) { + throw new RuntimeException("Unable to convert value to list, value=" + val,e); + } + } + + protected Set parseSet(Column c, String val) { + try { + if (c.getDescription() != null) { + if (c.getDescription().toLowerCase().equals("text") || c.getDescription().toLowerCase().equals("varchar")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("int")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("bigint")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("smallint")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("tinyint")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("double")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("decimal")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if (c.getDescription().toLowerCase().equals("float")) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } + } + return new ObjectMapper().readValue(val, new TypeReference>(){}); + + } catch (Exception e) { + throw new RuntimeException("Unable to convert value to set, value=" + val,e); + } + } + + protected Map parseMap(Column c, String val) { + try { + if (c.getDescription() != null) { + // TODO find dynamic way to create map types based on column types + String[] parts = c.getDescription().split(","); + if (parts[0].equals(DataType.Name.INT.name()) && + (parts[1].equals(DataType.Name.TEXT.name()) || parts[1].equals(DataType.Name.VARCHAR.name()))) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } else if ((parts[0].equals(DataType.Name.TEXT.name()) || parts[0].equals(DataType.Name.VARCHAR.name())) && + (parts[1].equals(DataType.Name.TEXT.name()) || parts[1].equals(DataType.Name.VARCHAR.name()))) { + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } + } + return new ObjectMapper().readValue(val, new TypeReference>(){}); + } catch (Exception e) { + throw new RuntimeException("Unable to convert value to map, expecting JSON, value=" + val,e); + } + } +} diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java index 68b77f645f..17bc6d2198 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java @@ -194,7 +194,7 @@ protected LoadStatus insert(CsvData data) { if (log.isDebugEnabled()) { log.debug("Preparing dml: " + this.currentDmlStatement.getSql()); } - getTransaction().prepare(this.currentDmlStatement.getSql()); + prepare(); } try { Conflict conflict = writerSettings.pickConflict(this.targetTable, batch); @@ -317,7 +317,7 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) { if (log.isDebugEnabled()) { log.debug("Preparing dml: " + this.currentDmlStatement.getSql()); } - getTransaction().prepare(this.currentDmlStatement.getSql()); + prepare(); } try { lookupDataMap = lookupDataMap == null ? getLookupDataMap(data, conflict) : lookupDataMap; @@ -473,8 +473,7 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC if (log.isDebugEnabled()) { log.debug("Preparing dml: " + this.currentDmlStatement.getSql()); } - getTransaction().prepare(this.currentDmlStatement.getSql()); - + prepare(); } rowData = (String[]) changedColumnValueList @@ -566,11 +565,11 @@ protected boolean sql(CsvData data) { for (String sql : sqlStatements) { try { sql = preprocessSqlStatement(sql); - getTransaction().prepare(sql); + prepare(sql, data); if (log.isDebugEnabled()) { log.debug("About to run: {}", sql); } - count += getTransaction().prepareAndExecute(sql); + count += prepareAndExecute(sql); if (log.isDebugEnabled()) { log.debug("{} rows updated when running: {}", count, sql); } @@ -878,6 +877,14 @@ protected boolean doesColumnNeedUpdated(int targetColumnIndex, Column column, Cs return needsUpdated; } + protected void prepare() { + getTransaction().prepare(this.currentDmlStatement.getSql()); + } + + protected void prepare(String sql, CsvData data) { + getTransaction().prepare(sql); + } + protected int execute(CsvData data, String[] values) { currentDmlValues = getPlatform().getObjectValues(batch.getBinaryEncoding(), values, currentDmlStatement.getMetaData(), false, writerSettings.isFitToColumn()); @@ -936,6 +943,10 @@ public DatabaseWriterSettings getWriterSettings() { return writerSettings; } + public int prepareAndExecute(String sql) { + return getTransaction().prepareAndExecute(sql); + } + protected String getCurData(ISqlTransaction transaction) { String curVal = null; if (writerSettings.isSaveCurrentValueOnError()) { diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DynamicDefaultDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DynamicDefaultDatabaseWriter.java index d489c22e2a..dfc7d01c99 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DynamicDefaultDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DynamicDefaultDatabaseWriter.java @@ -35,8 +35,8 @@ public DynamicDefaultDatabaseWriter(IDatabasePlatform symmetricPlatform, IDataba this.targetPlatform = targetPlatform; } - private boolean isSymmetricTable(String tableName) { - return tableName.startsWith(this.tablePrefix); + protected boolean isSymmetricTable(String tableName) { + return tableName.toUpperCase().startsWith(this.tablePrefix.toUpperCase()); } public boolean isLoadOnly() {