Skip to content

Commit

Permalink
0003581: Cassandra support as a load only node
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed May 25, 2018
1 parent 9550abe commit 47042f6
Show file tree
Hide file tree
Showing 14 changed files with 887 additions and 10 deletions.
1 change: 0 additions & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -233,7 +233,6 @@ subprojects { subproject ->
// javax.resource needed by jaybird
provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0"
provided "com.datastax.cassandra:cassandra-driver-core:3.1.4"
provided "com.datastax.cassandra:cassandra-driver-core:3.1.4"
provided "nl.cad:tps-parse:1.0.15-SNAPSHOT"

testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar')
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.JdbcDatabasePlatformFactory;
import org.jumpmind.db.platform.cassandra.CassandraPlatform;
import org.jumpmind.db.platform.generic.GenericJdbcDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.LogSqlBuilder;
Expand Down Expand Up @@ -311,6 +312,12 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring
DataSource dataSource, boolean waitOnAvailableDatabase, boolean isLoadOnly) {
log.info("Initializing connection to database");
if (dataSource == null) {
if (isLoadOnly) {
String dbUrl = properties.get(BasicDataSourcePropertyConstants.DB_POOL_URL);
if (dbUrl != null && dbUrl.startsWith("cassandra://")) {
return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12));
}
}
String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME);
if (StringUtils.isNotBlank(jndiName)) {
try {
Expand Down
Expand Up @@ -31,13 +31,14 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.CassandraDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.PingBack;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultTransformWriterConflictResolver;
import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.service.IParameterService;
Expand Down Expand Up @@ -66,6 +67,27 @@ public IDataWriter getDataWriter(final String sourceNodeId,
TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("CassandraPlatform")) {
try {

// TODO Evalute if ConflictResolver will work for Cassandra and if so remove duplicate code.
return new CassandraDatabaseWriter(symmetricDialect.getPlatform(),
symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(),
new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings,
resolvedData));

} catch (Exception e) {
log.warn(
"Failed to create the cassandra database writer. Check to see if all of the required jars have been added");
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
}
DynamicDefaultDatabaseWriter writer = new DynamicDefaultDatabaseWriter(symmetricDialect.getPlatform(),
symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(),
new DefaultTransformWriterConflictResolver(transformWriter) {
Expand Down
Expand Up @@ -54,5 +54,6 @@ private DatabaseNamesConstants() {
public final static String TIBERO = "tibero";
public final static String RAIMA = "raima";
public final static String TERADATA = "teradata";
public final static String CASSANDRA = "cassandra";

}
@@ -0,0 +1,16 @@
package org.jumpmind.db.platform.cassandra;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.platform.DatabaseInfo;
import org.jumpmind.db.sql.DmlStatement;

public class CassandraDMLStatement extends DmlStatement {

public CassandraDMLStatement(DmlType type, String catalogName, String schemaName, String tableName,
Column[] keysColumns, Column[] columns, boolean[] nullKeyValues, DatabaseInfo databaseInfo,
boolean useQuotedIdentifiers, String textColumnExpression) {
super(type, catalogName, schemaName, tableName, keysColumns, columns, nullKeyValues, databaseInfo, useQuotedIdentifiers,
textColumnExpression);
}

}
@@ -0,0 +1,13 @@
package org.jumpmind.db.platform.cassandra;

import org.jumpmind.db.platform.AbstractDdlBuilder;
import org.jumpmind.db.platform.DatabaseNamesConstants;

public class CassandraDdlBuilder extends AbstractDdlBuilder {

public CassandraDdlBuilder() {
super(DatabaseNamesConstants.CASSANDRA);
}


}
@@ -0,0 +1,74 @@
package org.jumpmind.db.platform.cassandra;

import java.util.List;
import java.util.Map;

import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.model.Trigger;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.IDdlReader;

public class CassandraDdlReader implements IDdlReader {
protected CassandraPlatform platform;

public CassandraDdlReader(IDatabasePlatform platform) {
this.platform = (CassandraPlatform) platform;
}

@Override
public Database readTables(String catalog, String schema, String[] tableTypes) {
// TODO Auto-generated method stub
return null;
}

@Override
public Table readTable(String catalog, String schema, String tableName) {
Map<String, Table> tables = platform.getMetaData()
.get(catalog == null ? schema : catalog);
return tables.get(tableName.toLowerCase());
}

@Override
public List<String> getTableTypes() {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getCatalogNames() {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getSchemaNames(String catalog) {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getTableNames(String catalog, String schema, String[] tableTypes) {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getColumnNames(String catalog, String schema, String tableName) {
// TODO Auto-generated method stub
return null;
}

@Override
public List<Trigger> getTriggers(String catalog, String schema, String tableName) {
// TODO Auto-generated method stub
return null;
}

@Override
public Trigger getTriggerFor(Table table, String name) {
// TODO Auto-generated method stub
return null;
}

}
@@ -0,0 +1,179 @@
package org.jumpmind.db.platform.cassandra;

import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.AbstractDatabasePlatform;
import org.jumpmind.db.platform.IDdlBuilder;
import org.jumpmind.db.platform.IDdlReader;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.SqlTemplateSettings;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;

public class CassandraPlatform extends AbstractDatabasePlatform {

Map<String, Map<String, Table>> metaData = new HashMap<String, Map<String, Table>>();

protected Session session;

protected Cluster cluster;

public CassandraPlatform(SqlTemplateSettings settings, String contactPoint) {
super(settings);

cluster = Cluster.builder().addContactPoint(contactPoint).build();
this.session = cluster.connect();

buildMetaData();
}

@Override
public String getName() {
return "cassandra";
}

@Override
public String getDefaultSchema() {
return null;
}

@Override
public String getDefaultCatalog() {
return null;
}

@Override
public <T> T getDataSource() {
return null;
}

@Override
public boolean isLob(int type) {
return false;
}

@Override
public IDdlBuilder getDdlBuilder() {
return new CassandraDdlBuilder();
}

@Override
public IDdlReader getDdlReader() {
return new CassandraDdlReader(this);
}

@Override
public ISqlTemplate getSqlTemplate() {
return new CassandraSqlTemplate();
}

@Override
public ISqlTemplate getSqlTemplateDirty() {
return new CassandraSqlTemplate();
}

public Session getSession() {
return session;
}

public void setSession(Session session) {
this.session = session;
}


public Map<String, Map<String, Table>> getMetaData() {
return metaData;
}

public void setMetaData(Map<String, Map<String, Table>> metaData) {
this.metaData = metaData;
}

protected void buildMetaData() {
for (KeyspaceMetadata keystoreMeta : cluster.getMetadata().getKeyspaces()) {
metaData.put(keystoreMeta.getName(), new HashMap<String, Table>());
for (TableMetadata tableMeta : keystoreMeta.getTables()) {
Table table = new Table();
table.setName(tableMeta.getName());
table.setSchema(keystoreMeta.getName());
List<ColumnMetadata> pkColumns = tableMeta.getPrimaryKey();

for (ColumnMetadata columnMeta : tableMeta.getColumns()) {
Column column = new Column();
column.setName(columnMeta.getName());
column.setMappedTypeCode(getMappedTypeCode(columnMeta.getType().getName().name()));
if (columnMeta.getType().getTypeArguments() != null) {
StringBuffer types = new StringBuffer();
for (DataType dt : columnMeta.getType().getTypeArguments()) {
if (types.length() > 0) {
types.append(",");
}
types.append(dt.getName().name());
column.setDescription(types.toString());
}
}
for (ColumnMetadata pkMeta : pkColumns) {
if (pkMeta.equals(columnMeta)) {
column.setPrimaryKey(true);
}
}
table.addColumn(column);
}
metaData.get(keystoreMeta.getName()).put(table.getName(), table);
}
}
}

protected int getMappedTypeCode(String dataType) {
/*
* Unsupported Types ================= ASCII(1), BLOB(3), COUNTER(5), INET(16),
* VARINT(14), TIMEUUID(15), CUSTOM(0), UDT(48,
* ProtocolVersion.V3), TUPLE(49, ProtocolVersion.V3),
*/
if (dataType == DataType.Name.INT.name()) {
return Types.INTEGER;
} else if (dataType == DataType.Name.BIGINT.name()) {
return Types.BIGINT;
} else if (dataType == DataType.Name.SMALLINT.name()) {
return Types.SMALLINT;
} else if (dataType == DataType.Name.TINYINT.name()) {
return Types.TINYINT;
} else if (dataType == DataType.Name.BOOLEAN.name()) {
return Types.BOOLEAN;
} else if (dataType == DataType.Name.DECIMAL.name()) {
return Types.DECIMAL;
} else if (dataType == DataType.Name.DOUBLE.name()) {
return Types.DOUBLE;
} else if (dataType == DataType.Name.FLOAT.name()) {
return Types.FLOAT;
} else if (dataType == DataType.Name.TIMESTAMP.name()) {
return Types.TIMESTAMP;
} else if (dataType == DataType.Name.DATE.name()) {
return Types.DATE;
} else if (dataType == DataType.Name.TIME.name()) {
return Types.TIME;
} else if (dataType == DataType.Name.VARCHAR.name() || dataType == DataType.Name.TEXT.name()
|| dataType == DataType.Name.UUID.name()) {
return Types.VARCHAR;
} else if (dataType == DataType.Name.LIST.name()) {
return Types.STRUCT;
} else if (dataType == DataType.Name.SET.name()) {
return Types.REF;
} else if (dataType == DataType.Name.MAP.name()) {
return Types.OTHER;
}
return Types.VARCHAR;

}

}

0 comments on commit 47042f6

Please sign in to comment.