From ae87fc60d84a1a69a6b80e0939dc03c4af608647 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Wed, 20 Jun 2018 09:30:43 -0400 Subject: [PATCH] 0003609: Kafka support as a load only node --- .../symmetric/ClientSymmetricEngine.java | 24 +- .../symmetric/common/ParameterConstants.java | 8 + .../load/DefaultDataLoaderFactory.java | 27 +- .../symmetric/load/KafkaWriterFilter.java | 261 ++++++++++++++++++ .../db/platform/DatabaseNamesConstants.java | 1 + .../cassandra/CassandraSqlTemplate.java | 182 +----------- .../db/platform/kafka/KafkaDdlBuilder.java | 12 + .../db/platform/kafka/KafkaDdlReader.java | 63 +++++ .../db/platform/kafka/KafkaPlatform.java | 51 ++++ .../db/platform/kafka/KafkaSqlTemplate.java | 12 + .../db/sql/AbstractJavaDriverSqlTemplate.java | 151 ++++++++++ .../symmetric/io/data/writer/KafkaWriter.java | 29 ++ .../symmetric/web/SymmetricEngineHolder.java | 4 +- 13 files changed, 637 insertions(+), 188 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java create mode 100644 symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlBuilder.java create mode 100644 symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlReader.java create mode 100644 symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaPlatform.java create mode 100644 symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaSqlTemplate.java create mode 100644 symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractJavaDriverSqlTemplate.java create mode 100644 symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java index 2d075e3adf..25b2b6f057 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java @@ -47,6 +47,7 @@ import org.jumpmind.db.platform.JdbcDatabasePlatformFactory; import org.jumpmind.db.platform.cassandra.CassandraPlatform; import org.jumpmind.db.platform.generic.GenericJdbcDatabasePlatform; +import org.jumpmind.db.platform.kafka.KafkaPlatform; import org.jumpmind.db.sql.JdbcSqlTemplate; import org.jumpmind.db.sql.LogSqlBuilder; import org.jumpmind.db.sql.SqlTemplateSettings; @@ -64,6 +65,7 @@ import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.job.IJobManager; import org.jumpmind.symmetric.job.JobManager; +import org.jumpmind.symmetric.load.KafkaWriterFilter; import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IExtensionService; @@ -96,8 +98,6 @@ public class ClientSymmetricEngine extends AbstractSymmetricEngine { public static final String PROPERTIES_FACTORY_CLASS_NAME = "properties.factory.class.name"; - public static final String LOAD_ONLY_PROPERTY_PREFIX = "target."; - protected File propertiesFile; protected Properties properties; @@ -314,8 +314,11 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring if (dataSource == null) { if (isLoadOnly) { String dbUrl = properties.get(BasicDataSourcePropertyConstants.DB_POOL_URL); + String dbDriver = properties.get(BasicDataSourcePropertyConstants.DB_POOL_DRIVER); if (dbUrl != null && dbUrl.startsWith("cassandra://")) { return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12)); + } else if (dbDriver != null && dbDriver.contains("kafka")) { + return new KafkaPlatform(createSqlTemplateSettings(properties)); } } String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME); @@ -498,7 +501,7 @@ protected void checkLoadOnly() { TypedProperties properties = new TypedProperties(); for (String prop : BasicDataSourcePropertyConstants.allProps ) { - properties.put(prop, parameterService.getString(LOAD_ONLY_PROPERTY_PREFIX + prop)); + properties.put(prop, parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + prop)); } String[] sqlTemplateProperties = new String[] { @@ -512,7 +515,18 @@ protected void checkLoadOnly() { ParameterConstants.LOG_SQL_PARAMETERS_INLINE }; for (String prop : sqlTemplateProperties) { - properties.put(prop, parameterService.getString(LOAD_ONLY_PROPERTY_PREFIX + prop)); + properties.put(prop, parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + prop)); + } + + String[] kafkaProperties = new String[] { + ParameterConstants.KAFKA_PRODUCER, + ParameterConstants.KAFKA_MESSAGE_BY, + ParameterConstants.KAFKA_TOPIC_BY, + ParameterConstants.KAFKA_FORMAT + }; + + for (String prop : kafkaProperties) { + properties.put(prop, parameterService.getString(prop)); } IDatabasePlatform targetPlatform = createDatabasePlatform(null, properties, null, true, true); @@ -532,7 +546,7 @@ protected void checkLoadOnly() { } ((GenericJdbcDatabasePlatform) targetPlatform).setName(name); } - targetPlatform.getDatabaseInfo().setNotNullColumnsSupported(parameterService.is(LOAD_ONLY_PROPERTY_PREFIX + ParameterConstants.CREATE_TABLE_NOT_NULL_COLUMNS, true)); + targetPlatform.getDatabaseInfo().setNotNullColumnsSupported(parameterService.is(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + ParameterConstants.CREATE_TABLE_NOT_NULL_COLUMNS, true)); } getSymmetricDialect().setTargetPlatform(targetPlatform); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 1a03333e08..d1688ed5b2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -439,6 +439,14 @@ private ParameterConstants() { public final static String MYSQL_TINYINT_DDL_TO_BOOLEAN = "mysql.tinyint.ddl.to.boolean"; + public static final String LOAD_ONLY_PROPERTY_PREFIX = "target."; + + public final static String KAFKA_PRODUCER = "kafka.producer"; + public final static String KAFKA_FORMAT = "kafka.format"; + public final static String KAFKA_MESSAGE_BY = "kafka.message.by"; + public final static String KAFKA_TOPIC_BY = "kafka.topic.by"; + + public static Map getParameterMetaData() { return parameterMetaData; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java index ce5ec1334f..c05aea9bbe 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java @@ -20,6 +20,7 @@ */ package org.jumpmind.symmetric.load; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; +import org.jumpmind.symmetric.io.data.writer.KafkaWriter; import org.jumpmind.symmetric.io.data.writer.ResolvedData; import org.jumpmind.symmetric.io.data.writer.TransformWriter; import org.jumpmind.symmetric.service.IParameterService; @@ -80,7 +82,30 @@ public IDataWriter getDataWriter(final String sourceNodeId, } catch (Exception e) { log.warn( - "Failed to create the cassandra database writer. Check to see if all of the required jars have been added"); + "Failed to create the cassandra database writer. Check to see if all of the required jars have been added", e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } + } + if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("KafkaPlatform")) { + try { + if (filters == null) { + filters = new ArrayList(); + } + filters.add(new KafkaWriterFilter(this.parameterService)); + + return new KafkaWriter(symmetricDialect.getPlatform(), + symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(), + new DefaultTransformWriterConflictResolver(transformWriter), + buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, + resolvedData)); + + } catch (Exception e) { + log.warn( + "Failed to create the kafka writer.", e); if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java new file mode 100644 index 0000000000..a73abbcaf3 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -0,0 +1,261 @@ +package org.jumpmind.symmetric.load; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.util.BasicDataSourcePropertyConstants; +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataContext; +import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; +import org.jumpmind.symmetric.service.IParameterService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaWriterFilter implements IDatabaseWriterFilter { + protected final String KAFKA_TEXT_CACHE = "KAFKA_TEXT_CACHE" + this.hashCode(); + + protected Map> kafkaDataMap = new HashMap>(); + protected String kafkaDataKey; + + private final Logger log = LoggerFactory.getLogger(IDatabaseWriterFilter.class); + + private String url; + + private String producer; + + private String outputFormat; + + private String topicBy; + + private String messageBy; + + public final static String KAFKA_FORMAT_XML = "XML"; + public final static String KAFKA_FORMAT_JSON = "JSON"; + public final static String KAFKA_FORMAT_AVRO = "AVRO"; + public final static String KAFKA_FORMAT_CSV = "CSV"; + + public final static String KAFKA_MESSAGE_BY_BATCH = "BATCH"; + public final static String KAFKA_MESSAGE_BY_ROW = "ROW"; + + public final static String KAFKA_TOPIC_BY_TABLE = "TABLE"; + public final static String KAFKA_TOPIC_BY_CHANNEL = "CHANNEL"; + + public final static String AVRO_CDC_SCHEMA = "{" + + "\"type\":\"record\"," + + "\"name\":\"cdc\"," + + "\"fields\":[" + + " { \"name\":\"table\", \"type\":\"string\" }," + + " { \"name\":\"eventType\", \"type\":\"string\" }," + + " { \"name\":\"data\", \"type\":{" + + " \"type\":\"array\", \"items\":{" + + " \"name\":\"column\"," + + " \"type\":\"record\"," + + " \"fields\":[" + + " {\"name\":\"name\", \"type\":\"string\"}," + + " {\"name\":\"value\", \"type\":\"string\"} ] }}}]}"; + + + + Schema.Parser parser = new Schema.Parser(); + Schema schema = null; + + public KafkaWriterFilter(IParameterService parameterService) { + log.info(AVRO_CDC_SCHEMA); + schema = parser.parse(AVRO_CDC_SCHEMA); + this.url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_URL); + if (url == null) { + throw new RuntimeException( + "Kakfa not configured properly, verify you have set the endpoint to kafka with the following property : " + + ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_URL); + } + + this.producer = parameterService.getString(ParameterConstants.KAFKA_PRODUCER, "SymmetricDS"); + this.outputFormat = parameterService.getString(ParameterConstants.KAFKA_FORMAT, KAFKA_FORMAT_JSON); + this.topicBy = parameterService.getString(ParameterConstants.KAFKA_TOPIC_BY, KAFKA_TOPIC_BY_CHANNEL); + this.messageBy = parameterService.getString(ParameterConstants.KAFKA_MESSAGE_BY, KAFKA_MESSAGE_BY_BATCH); + } + + public boolean beforeWrite(DataContext context, Table table, CsvData data) { + if (table.getNameLowerCase().startsWith("sym_")) { + return true; + } else { + log.debug("Processing table " + table + " for Kafka"); + + String[] rowData = data.getParsedData(CsvData.ROW_DATA); + if (data.getDataEventType() == DataEventType.DELETE) { + rowData = data.getParsedData(CsvData.OLD_DATA); + } + + StringBuffer kafkaText = new StringBuffer(); + + if (topicBy.equals(KAFKA_TOPIC_BY_CHANNEL)) { + kafkaDataKey = context.getBatch().getChannelId(); + } else { + kafkaDataKey = table.getNameLowerCase(); + } + + if (kafkaDataMap.get(kafkaDataKey) == null) { + kafkaDataMap.put(kafkaDataKey, new ArrayList()); + } + List kafkaDataList = kafkaDataMap.get(kafkaDataKey); + + + if (outputFormat.equals(KAFKA_FORMAT_JSON)) { + kafkaText.append("{\"").append(table.getName()).append("\": {") + .append("\"eventType\": \"" + data.getDataEventType() + "\",").append("\"data\": { "); + for (int i = 0; i < table.getColumnNames().length; i++) { + kafkaText.append("\"" + table.getColumnNames()[i] + "\": \"" + rowData[i]); + if (i + 1 < table.getColumnNames().length) { + kafkaText.append("\","); + } + } + kafkaText.append(" } } }"); + } else if (outputFormat.equals(KAFKA_FORMAT_CSV)) { + kafkaText.append("\nTABLE").append(",").append(table.getName()).append(",").append("EVENT").append(",") + .append(data.getDataEventType()).append(","); + + for (int i = 0; i < table.getColumnNames().length; i++) { + kafkaText.append(table.getColumnNames()[i]).append(",").append(rowData[i]); + if (i + 1 < table.getColumnNames().length) { + kafkaText.append(","); + } + } + } else if (outputFormat.equals(KAFKA_FORMAT_XML)) { + kafkaText.append(""); + for (int i = 0; i < table.getColumnNames().length; i++) { + kafkaText.append("") + .append(rowData[i]) + .append(""); + } + kafkaText.append(""); + } else if (outputFormat.equals(KAFKA_FORMAT_AVRO)) { + + GenericData.Record avroRecord = new GenericData.Record(schema); + avroRecord.put("table", table.getName()); + avroRecord.put("eventType", data.getDataEventType().toString()); + + Collection dataCollection = new ArrayList(); + + for (int i = 0; i < table.getColumnNames().length; i++) { + + GenericRecord columnRecord = new GenericData.Record(schema.getField("data").schema().getElementType()); + + columnRecord.put("name", table.getColumnNames()[i]); + columnRecord.put("value", rowData[i]); + + dataCollection.add(columnRecord); + } + avroRecord.put("data", dataCollection); + try { + kafkaText.append(datumToByteArray(schema, avroRecord)); + } catch (IOException ioe) { + throw new RuntimeException("Unable to convert row data to an Avro record", ioe); + } + } + kafkaDataList.add(kafkaText.toString()); + } + return false; + } + + public void afterWrite(DataContext context, Table table, CsvData data) { + } + + public boolean handlesMissingTable(DataContext context, Table table) { + return true; + } + + public void earlyCommit(DataContext context) { + } + + public void batchComplete(DataContext context) { + if (!context.getBatch().getChannelId().equals("heartbeat") + && !context.getBatch().getChannelId().equals("config")) { + String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + + context.getBatch().getBatchId(); + log.info("Processing batch " + batchFileName + " for Kafka"); + try { + if (kafkaDataMap.size() > 0) { + StringBuffer kafkaText = new StringBuffer(); + for (Map.Entry> entry : kafkaDataMap.entrySet()) { + for (String row : entry.getValue()) { + if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { + sendKafkaMessage(row, entry.getKey()); + } else { + kafkaText.append(row); + } + } + if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { + sendKafkaMessage(kafkaText.toString(), entry.getKey()); + } + } + kafkaDataMap = new HashMap>(); + } else { + log.info("No text found to write to kafka"); + } + } catch (Exception e) { + log.warn("Unable to write batch to Kafka " + batchFileName, e); + e.printStackTrace(); + } finally { + context.put(KAFKA_TEXT_CACHE, new HashMap>()); + } + } + } + + public void batchCommitted(DataContext context) { + } + + public void batchRolledback(DataContext context) { + } + + public void sendKafkaMessage(String kafkaText, String topic) { + Map configs = new HashMap(); + + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer); + + KafkaProducer producer = new KafkaProducer(configs); + + producer.send(new ProducerRecord(topic, kafkaText)); + log.debug("Data to be sent to Kafka-" + kafkaText); + + producer.close(); + } + + public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException { + GenericDatumWriter writer = new GenericDatumWriter(schema); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try { + Encoder e = EncoderFactory.get().binaryEncoder(os, null); + writer.write(datum, e); + e.flush(); + byte[] byteData = os.toByteArray(); + return byteData; + } finally { + os.close(); + } + } +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java index d2110c3593..a0aa48c7c9 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java @@ -55,5 +55,6 @@ private DatabaseNamesConstants() { public final static String RAIMA = "raima"; public final static String TERADATA = "teradata"; public final static String CASSANDRA = "cassandra"; + public final static String KAFKA = "kafka"; } diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java index e91f3c1af1..f740cc67c4 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraSqlTemplate.java @@ -1,190 +1,12 @@ package org.jumpmind.db.platform.cassandra; -import java.util.Map; -import java.util.Set; +import org.jumpmind.db.sql.AbstractJavaDriverSqlTemplate; -import org.jumpmind.db.sql.AbstractSqlTemplate; -import org.jumpmind.db.sql.ISqlReadCursor; -import org.jumpmind.db.sql.ISqlResultsListener; -import org.jumpmind.db.sql.ISqlRowMapper; -import org.jumpmind.db.sql.ISqlStatementSource; -import org.jumpmind.db.sql.ISqlTransaction; - -public class CassandraSqlTemplate extends AbstractSqlTemplate { - - @Override - public byte[] queryForBlob(String sql, Object... args) { - // TODO Auto-generated method stub - return null; - } - - @Override - public byte[] queryForBlob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) { - // TODO Auto-generated method stub - return null; - } - - @Override - public String queryForClob(String sql, Object... args) { - // TODO Auto-generated method stub - return null; - } - - @Override - public String queryForClob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) { - // TODO Auto-generated method stub - return null; - } - - @Override - public T queryForObject(String sql, Class clazz, Object... params) { - // TODO Auto-generated method stub - return null; - } - - @Override - public Map queryForMap(String sql, Object... params) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ISqlReadCursor queryForCursor(String sql, ISqlRowMapper mapper, Object[] params, int[] types) { - // TODO Auto-generated method stub - return null; - } - - @Override - public int update(boolean autoCommit, boolean failOnError, int commitRate, ISqlResultsListener listener, - String... sql) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int update(boolean autoCommit, boolean failOnError, boolean failOnDrops, boolean failOnSequenceCreate, - int commitRate, ISqlResultsListener listener, ISqlStatementSource source) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int update(boolean autoCommit, boolean failOnError, int commitRate, String... sql) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int update(String sql, Object[] values, int[] types) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void testConnection() { - // TODO Auto-generated method stub - - } - - @Override - public boolean isUniqueKeyViolation(Throwable ex) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean isForeignKeyViolation(Throwable ex) { - // TODO Auto-generated method stub - return false; - } - - @Override - public ISqlTransaction startSqlTransaction() { - return new CassandraSqlTransaction(); - } - - @Override - public ISqlTransaction startSqlTransaction(boolean autoCommit) { - // TODO Auto-generated method stub - return null; - } - - @Override - public int getDatabaseMajorVersion() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int getDatabaseMinorVersion() { - // TODO Auto-generated method stub - return 0; - } +public class CassandraSqlTemplate extends AbstractJavaDriverSqlTemplate { @Override public String getDatabaseProductName() { return "cassandra"; } - @Override - public String getDatabaseProductVersion() { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getDriverName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getDriverVersion() { - // TODO Auto-generated method stub - return null; - } - - @Override - public Set getSqlKeywords() { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean supportsGetGeneratedKeys() { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean isStoresUpperCaseIdentifiers() { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean isStoresLowerCaseIdentifiers() { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean isStoresMixedCaseQuotedIdentifiers() { - // TODO Auto-generated method stub - return false; - } - - @Override - public long insertWithGeneratedKey(String sql, String column, String sequenceName, Object[] args, int[] types) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public boolean isDataTruncationViolation(Throwable ex) { - // TODO Auto-generated method stub - return false; - } - - } diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlBuilder.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlBuilder.java new file mode 100644 index 0000000000..d42b7c8c50 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlBuilder.java @@ -0,0 +1,12 @@ +package org.jumpmind.db.platform.kafka; + +import org.jumpmind.db.platform.AbstractDdlBuilder; +import org.jumpmind.db.platform.DatabaseNamesConstants; + +public class KafkaDdlBuilder extends AbstractDdlBuilder { + + public KafkaDdlBuilder() { + super(DatabaseNamesConstants.KAFKA); + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlReader.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlReader.java new file mode 100644 index 0000000000..b9d8dce58e --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlReader.java @@ -0,0 +1,63 @@ +package org.jumpmind.db.platform.kafka; + +import java.util.List; + +import org.jumpmind.db.model.Database; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.model.Trigger; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.platform.IDdlReader; + +public class KafkaDdlReader implements IDdlReader { + protected KafkaPlatform platform; + + public KafkaDdlReader(IDatabasePlatform platform) { + this.platform = (KafkaPlatform) platform; + } + + @Override + public Database readTables(String catalog, String schema, String[] tableTypes) { + return null; + } + + @Override + public Table readTable(String catalog, String schema, String tableName) { + return null; + } + + @Override + public List getTableTypes() { + return null; + } + + @Override + public List getCatalogNames() { + return null; + } + + @Override + public List getSchemaNames(String catalog) { + return null; + } + + @Override + public List getTableNames(String catalog, String schema, String[] tableTypes) { + return null; + } + + @Override + public List getColumnNames(String catalog, String schema, String tableName) { + return null; + } + + @Override + public List getTriggers(String catalog, String schema, String tableName) { + return null; + } + + @Override + public Trigger getTriggerFor(Table table, String name) { + return null; + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaPlatform.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaPlatform.java new file mode 100644 index 0000000000..6c26179c17 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaPlatform.java @@ -0,0 +1,51 @@ +package org.jumpmind.db.platform.kafka; + +import org.jumpmind.db.platform.AbstractDatabasePlatform; +import org.jumpmind.db.sql.ISqlTemplate; +import org.jumpmind.db.sql.SqlTemplateSettings; + +public class KafkaPlatform extends AbstractDatabasePlatform { + + public KafkaPlatform(SqlTemplateSettings settings) { + super(settings); + super.ddlBuilder = new KafkaDdlBuilder(); + super.ddlReader = new KafkaDdlReader(this); + } + + @Override + public String getName() { + return "kafka"; + } + + @Override + public String getDefaultSchema() { + return null; + } + + @Override + public String getDefaultCatalog() { + return null; + } + + @Override + public T getDataSource() { + return null; + } + + @Override + public boolean isLob(int type) { + return false; + } + + @Override + public ISqlTemplate getSqlTemplate() { + return new KafkaSqlTemplate(); + } + + @Override + public ISqlTemplate getSqlTemplateDirty() { + return new KafkaSqlTemplate(); + } + + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaSqlTemplate.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaSqlTemplate.java new file mode 100644 index 0000000000..35cacb2499 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaSqlTemplate.java @@ -0,0 +1,12 @@ +package org.jumpmind.db.platform.kafka; + +import org.jumpmind.db.sql.AbstractJavaDriverSqlTemplate; + +public class KafkaSqlTemplate extends AbstractJavaDriverSqlTemplate{ + + @Override + public String getDatabaseProductName() { + return "kafka"; + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractJavaDriverSqlTemplate.java b/symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractJavaDriverSqlTemplate.java new file mode 100644 index 0000000000..83a971016d --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractJavaDriverSqlTemplate.java @@ -0,0 +1,151 @@ +package org.jumpmind.db.sql; + +import java.util.Map; +import java.util.Set; + +public abstract class AbstractJavaDriverSqlTemplate extends AbstractSqlTemplate { + + public abstract String getDatabaseProductName(); + + @Override + public byte[] queryForBlob(String sql, Object... args) { + return null; + } + + @Override + public byte[] queryForBlob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) { + return null; + } + + @Override + public String queryForClob(String sql, Object... args) { + return null; + } + + @Override + public String queryForClob(String sql, int jdbcTypeCode, String jdbcTypeName, Object... args) { + return null; + } + + @Override + public T queryForObject(String sql, Class clazz, Object... params) { + return null; + } + + @Override + public Map queryForMap(String sql, Object... params) { + return null; + } + + @Override + public ISqlReadCursor queryForCursor(String sql, ISqlRowMapper mapper, Object[] params, int[] types) { + return null; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, int commitRate, ISqlResultsListener listener, + String... sql) { + return 0; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, boolean failOnDrops, boolean failOnSequenceCreate, + int commitRate, ISqlResultsListener listener, ISqlStatementSource source) { + return 0; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, int commitRate, String... sql) { + return 0; + } + + @Override + public int update(String sql, Object[] values, int[] types) { + return 0; + } + + @Override + public void testConnection() { + } + + @Override + public boolean isUniqueKeyViolation(Throwable ex) { + return false; + } + + @Override + public boolean isDataTruncationViolation(Throwable ex) { + return false; + } + + @Override + public boolean isForeignKeyViolation(Throwable ex) { + return false; + } + + @Override + public ISqlTransaction startSqlTransaction() { + return null; + } + + @Override + public ISqlTransaction startSqlTransaction(boolean autoCommit) { + return null; + } + + @Override + public int getDatabaseMajorVersion() { + return 0; + } + + @Override + public int getDatabaseMinorVersion() { + return 0; + } + + @Override + public String getDatabaseProductVersion() { + return null; + } + + @Override + public String getDriverName() { + return null; + } + + @Override + public String getDriverVersion() { + return null; + } + + @Override + public Set getSqlKeywords() { + return null; + } + + @Override + public boolean supportsGetGeneratedKeys() { + return false; + } + + @Override + public boolean isStoresUpperCaseIdentifiers() { + return false; + } + + @Override + public boolean isStoresLowerCaseIdentifiers() { + return false; + } + + @Override + public boolean isStoresMixedCaseQuotedIdentifiers() { + return false; + } + + @Override + public long insertWithGeneratedKey(String sql, String column, String sequenceName, Object[] args, int[] types) { + return 0; + } + +} diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java new file mode 100644 index 0000000000..51c64abc5e --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java @@ -0,0 +1,29 @@ +package org.jumpmind.symmetric.io.data.writer; + +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.symmetric.io.data.CsvData; + +public class KafkaWriter extends DynamicDefaultDatabaseWriter { + + public KafkaWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String prefix, + IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings) { + super(symmetricPlatform, targetPlatform, prefix, conflictResolver, settings); + } + + /* + @Override + protected Table lookupTableAtTarget(Table sourceTable) { + return sourceTable; + } + + + @Override + protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDetails) { + } + + @Override + protected void allowInsertIntoAutoIncrementColumns(boolean value, Table table) { + } + */ +} diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java index 3b709c1a9a..b81dcc4911 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java @@ -325,12 +325,12 @@ public ISymmetricEngine install(Properties passedInProperties) throws Exception } } - String loadOnlyPassword = properties.getProperty(ClientSymmetricEngine.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_PASSWORD); + String loadOnlyPassword = properties.getProperty(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_PASSWORD); if (StringUtils.isNotBlank(loadOnlyPassword) && !loadOnlyPassword.startsWith(SecurityConstants.PREFIX_ENC)) { try { ISecurityService service = SecurityServiceFactory.create(SecurityServiceType.CLIENT, properties); - properties.setProperty(ClientSymmetricEngine.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_PASSWORD, + properties.setProperty(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_PASSWORD, SecurityConstants.PREFIX_ENC + service.encrypt(loadOnlyPassword)); } catch (Exception ex) { log.warn("Could not encrypt load only password", ex);