From 95b0c9ff4dc9cb1fa1883de2fce4a22a8210cb86 Mon Sep 17 00:00:00 2001 From: maxwellpettit Date: Wed, 1 Nov 2017 11:16:19 -0400 Subject: [PATCH] Build repeatable stress test framework on AWS and run for 3.8 and 3.9. --- ....java => StressTestHeartbeatListener.java} | 259 ++++++++++-------- 1 file changed, 150 insertions(+), 109 deletions(-) rename symmetric-core/src/test/java/org/jumpmind/symmetric/ext/{LoadTestHeartbeatListener.java => StressTestHeartbeatListener.java} (63%) diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/ext/LoadTestHeartbeatListener.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/ext/StressTestHeartbeatListener.java similarity index 63% rename from symmetric-core/src/test/java/org/jumpmind/symmetric/ext/LoadTestHeartbeatListener.java rename to symmetric-core/src/test/java/org/jumpmind/symmetric/ext/StressTestHeartbeatListener.java index 1c4b5fd496..0904448a91 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/ext/LoadTestHeartbeatListener.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/ext/StressTestHeartbeatListener.java @@ -1,3 +1,5 @@ +package org.jumpmind.symmetric.ext; + /** * Licensed to JumpMind Inc under one or more contributor * license agreements. See the NOTICE file distributed @@ -18,7 +20,6 @@ * specific language governing permissions and limitations * under the License. */ -package org.jumpmind.symmetric.ext; import java.util.ArrayList; import java.util.HashMap; @@ -26,6 +27,8 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.RandomStringUtils; + import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.sql.Row; @@ -34,6 +37,7 @@ import org.jumpmind.symmetric.io.data.transform.TransformColumn; import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType; import org.jumpmind.symmetric.io.data.transform.TransformPoint; +import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.Router; @@ -42,7 +46,7 @@ import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink; import org.jumpmind.util.AppUtils; -public class LoadTestHeartbeatListener +public class StressTestHeartbeatListener implements org.jumpmind.symmetric.ext.IHeartbeatListener, org.jumpmind.symmetric.ext.ISymmetricEngineAware { private static final String SERVER_NODE_GROUP = "server"; @@ -53,35 +57,31 @@ public class LoadTestHeartbeatListener private static final String CLIENT_TO_SERVER = "client_to_server"; - private static final String SYNC_CHANNEL = "default"; + private static final String CONTROL_CHANNEL = "stress_test_control"; + + private static final String SYNC_CHANNEL = "stress_test_sync"; - private static final String LOAD_TEST_CONTROL = "LOAD_TEST_CONTROL"; + private static final String STRESS_TEST_CONTROL = "STRESS_TEST_CONTROL"; - private static final String LOAD_TEST_STATUS = "LOAD_TEST_STATUS"; + private static final String STRESS_TEST_STATUS = "STRESS_TEST_STATUS"; - private static final String LOAD_TEST_ROW_OUTGOING = "LOAD_TEST_ROW_OUTGOING"; + private static final String STRESS_TEST_ROW_OUTGOING = "STRESS_TEST_ROW_OUTGOING"; - private static final String LOAD_TEST_ROW_INCOMING = "LOAD_TEST_ROW_INCOMING"; + private static final String STRESS_TEST_ROW_INCOMING = "STRESS_TEST_ROW_INCOMING"; - private String selectControlSql = "select * from " + LOAD_TEST_CONTROL; + private String selectControlSql = "select * from " + STRESS_TEST_CONTROL; - private String selectStatusSql = "select status from " + LOAD_TEST_STATUS + " where run_id=? and node_id = ?"; + private String selectStatusSql = "select status from " + STRESS_TEST_STATUS + " where run_id=? and node_id = ?"; - private String insertStatusSql = "insert into " + LOAD_TEST_STATUS + private String insertStatusSql = "insert into " + STRESS_TEST_STATUS + " (run_id,node_id,status,start_time) values (?,?,?,current_timestamp)"; - private String updateStatusSql = "update " + LOAD_TEST_STATUS + private String updateStatusSql = "update " + STRESS_TEST_STATUS + " set status=?, end_time=current_timestamp where run_id=? and node_id = ?"; - private String insertOutgoingSql = "insert into " + LOAD_TEST_ROW_OUTGOING - + "(row_id,node_id,run_id,insert_time) values (?,?,?,current_timestamp)"; + private String dropOutgoingSql = "drop table if exists " + STRESS_TEST_ROW_OUTGOING; - private String dropOutgoingSql = "drop table if exists " + LOAD_TEST_ROW_OUTGOING; - - private String insertIncomingSql = "insert into " + LOAD_TEST_ROW_INCOMING - + "(row_id,node_id,run_id,insert_time) values (?,?,?,current_timestamp)"; - - private String dropIncomingSql = "drop table if exists " + LOAD_TEST_ROW_INCOMING; + private String dropIncomingSql = "drop table if exists " + STRESS_TEST_ROW_INCOMING; private ISymmetricEngine engine; @@ -94,52 +94,79 @@ public class LoadTestHeartbeatListener private Map outgoingCounts = new HashMap(); /** - * Creates the LOAD_TEST_CONTROL and LOAD_TEST_STATUS tables and adds + * Creates the STRESS_TEST_CONTROL and STRESS_TEST_STATUS tables and adds * triggers */ - private void initLoadTestControl() { - createLoadTestControlTable(); - addLoadTestControlTriggers(); + private void initStressTest() { + + createChannels(); - createLoadTestStatusTable(); - addLoadTestStatusTriggers(); + engine.getDatabasePlatform().getDdlBuilder().setDelimitedIdentifierModeOn(false); + + createStressTestControlTable(); + if (!engine.getTriggerRouterService().doesTriggerExistForTable(STRESS_TEST_CONTROL)) { + addStressTestControlTriggers(); + } + + createStressTestStatusTable(); + if (!engine.getTriggerRouterService().doesTriggerExistForTable(STRESS_TEST_STATUS)) { + addStressTestStatusTriggers(); + } + + engine.syncTriggers(); initialized = true; } + private void createChannels() { + if (engine.getConfigurationService().getChannel(CONTROL_CHANNEL) == null) { + Channel control = new Channel(CONTROL_CHANNEL, 100000); + engine.getConfigurationService().saveChannel(control, false); + } + + if (engine.getConfigurationService().getChannel(SYNC_CHANNEL) == null) { + Channel sync = new Channel(SYNC_CHANNEL, 100001); + engine.getConfigurationService().saveChannel(sync, false); + } + } + /** - * Creates the LOAD_TEST_ROW_OUTGOING table, seeds initial test data, + * Creates the STRESS_TEST_ROW_OUTGOING table, seeds initial test data, * configures the table, and loads the table to the client */ - private void initLoadRowOutgoing(int payloadColumns, int runId, int initialSeedSize) { - createLoadTestRowOutgoingTable(payloadColumns); - seedOutgoing(runId, initialSeedSize); - addLoadTestRowOutgoingConfig(); + private void initStressTestRowOutgoing(int payloadColumns, int runId, int initialSeedSize) { + createStressTestRowOutgoingTable(payloadColumns); + seedOutgoing(runId, initialSeedSize, payloadColumns); + addStressTestRowOutgoingConfig(); + engine.syncTriggers(); for (Node client : engine.getNodeService().findTargetNodesFor(NodeGroupLinkAction.W)) { // Drop the table using the config channel to force drop order engine.getDataService().sendSQL(client.getNodeId(), null, null, "SYM_PARAMETER", dropOutgoingSql); - engine.getDataService().sendSchema(client.getNodeId(), null, null, LOAD_TEST_ROW_OUTGOING, true); - engine.getDataService().reloadTable(client.getNodeId(), null, null, LOAD_TEST_ROW_OUTGOING); + engine.getDataService().sendSchema(client.getNodeId(), null, null, STRESS_TEST_ROW_OUTGOING, true); + if (initialSeedSize > 0) { + engine.getDataService().reloadTable(client.getNodeId(), null, null, STRESS_TEST_ROW_OUTGOING); + } } } /** - * Creates the LOAD_TEST_ROW_INCOMING table and configures the table + * Creates the STRESS_TEST_ROW_INCOMING table and configures the table */ - private void initLoadRowIncoming(int payloadColumns) { - createLoadTestRowIncomingTable(payloadColumns); - addLoadTestRowIncomingConfig(); + private void initStressTestRowIncoming(int payloadColumns) { + createStressTestRowIncomingTable(payloadColumns); + addStressTestRowIncomingConfig(); + engine.syncTriggers(); for (Node client : engine.getNodeService().findTargetNodesFor(NodeGroupLinkAction.W)) { // Drop the table using the config channel to force drop order engine.getDataService().sendSQL(client.getNodeId(), null, null, "SYM_PARAMETER", dropIncomingSql); - engine.getDataService().sendSchema(client.getNodeId(), null, null, LOAD_TEST_ROW_INCOMING, true); + engine.getDataService().sendSchema(client.getNodeId(), null, null, STRESS_TEST_ROW_INCOMING, true); } } /** - * Creates the LOAD_TEST_CONTROL table + * Creates the STRESS_TEST_CONTROL table */ - private void createLoadTestControlTable() { + private void createStressTestControlTable() { Column runId = new Column("RUN_ID"); runId.setMappedType("INTEGER"); runId.setPrimaryKey(true); @@ -159,16 +186,15 @@ private void createLoadTestControlTable() { Column duration = new Column("DURATION_MINUTES"); duration.setMappedType("BIGINT"); - Table table = new Table(LOAD_TEST_CONTROL, runId, clientCommitSleepMs, clientCommitRows, serverCommitSleepMs, serverCommitRows, + Table table = new Table(STRESS_TEST_CONTROL, runId, clientCommitSleepMs, clientCommitRows, serverCommitSleepMs, serverCommitRows, payloadColumns, initialSeedSize, duration); - engine.getDatabasePlatform().createTables(true, true, table); } /** - * Creates the LOAD_TEST_STATUS table + * Creates the STRESS_TEST_STATUS table */ - private void createLoadTestStatusTable() { + private void createStressTestStatusTable() { Column runId = new Column("RUN_ID"); runId.setMappedType("INTEGER"); runId.setPrimaryKey(true); @@ -188,17 +214,17 @@ private void createLoadTestStatusTable() { Column endTime = new Column("END_TIME"); endTime.setMappedType("TIMESTAMP"); - Table table = new Table(LOAD_TEST_STATUS, runId, nodeId, status, startTime, endTime); + Table table = new Table(STRESS_TEST_STATUS, runId, nodeId, status, startTime, endTime); engine.getDatabasePlatform().createTables(true, true, table); } /** - * Creates the LOAD_TEST_RUN_OUTGOING table + * Creates the STRESS_TEST_RUN_OUTGOING table */ - private void createLoadTestRowOutgoingTable(int payloadColumns) { - Column[] defaultColumns = getDefaultLoadRowColumns(); - Table table = new Table(LOAD_TEST_ROW_OUTGOING, defaultColumns); + private void createStressTestRowOutgoingTable(int payloadColumns) { + Column[] defaultColumns = getDefaultStressTestRowColumns(); + Table table = new Table(STRESS_TEST_ROW_OUTGOING, defaultColumns); List payloads = getPayloadColumns(payloadColumns); table.addColumns(payloads); @@ -206,11 +232,11 @@ private void createLoadTestRowOutgoingTable(int payloadColumns) { } /** - * Creates the LOAD_TEST_RUN_INCOMING table + * Creates the STRESS_TEST_RUN_INCOMING table */ - private void createLoadTestRowIncomingTable(int payloadColumns) { - Column[] defaultColumns = getDefaultLoadRowColumns(); - Table table = new Table(LOAD_TEST_ROW_INCOMING, defaultColumns); + private void createStressTestRowIncomingTable(int payloadColumns) { + Column[] defaultColumns = getDefaultStressTestRowColumns(); + Table table = new Table(STRESS_TEST_ROW_INCOMING, defaultColumns); List payloads = getPayloadColumns(payloadColumns); table.addColumns(payloads); @@ -218,9 +244,9 @@ private void createLoadTestRowIncomingTable(int payloadColumns) { } /** - * Creates the default columns for the LOAD_TEST_ROW_* tables + * Creates the default columns for the STRESS_TEST_ROW_* tables */ - private Column[] getDefaultLoadRowColumns() { + private Column[] getDefaultStressTestRowColumns() { Column rowId = new Column("ROW_ID"); rowId.setMappedType("INTEGER"); rowId.setPrimaryKey(true); @@ -244,7 +270,7 @@ private Column[] getDefaultLoadRowColumns() { } /** - * Creates the payload columns for the LOAD_TEST_RUN_* tables + * Creates the payload columns for the STRESS_TEST_RUN_* tables */ private List getPayloadColumns(int payloadColumns) { List payloads = new ArrayList(); @@ -260,40 +286,36 @@ private List getPayloadColumns(int payloadColumns) { } /** - * Adds triggers to the LOAD_TEST_CONTROL table + * Adds triggers to the STRESS_TEST_CONTROL table */ - private void addLoadTestControlTriggers() { - addTrigger(LOAD_TEST_CONTROL, SYNC_CHANNEL, SERVER_TO_CLIENT); - addTrigger(LOAD_TEST_CONTROL, SYNC_CHANNEL, CLIENT_TO_SERVER); - engine.syncTriggers(); + private void addStressTestControlTriggers() { + addTrigger(STRESS_TEST_CONTROL, CONTROL_CHANNEL, SERVER_TO_CLIENT); + addTrigger(STRESS_TEST_CONTROL, CONTROL_CHANNEL, CLIENT_TO_SERVER); } /** - * Adds triggers to the LOAD_TEST_STATUS table + * Adds triggers to the STRESS_TEST_STATUS table */ - private void addLoadTestStatusTriggers() { - addTrigger(LOAD_TEST_STATUS, SYNC_CHANNEL, SERVER_TO_CLIENT); - addTrigger(LOAD_TEST_STATUS, SYNC_CHANNEL, CLIENT_TO_SERVER); - engine.syncTriggers(); + private void addStressTestStatusTriggers() { + addTrigger(STRESS_TEST_STATUS, CONTROL_CHANNEL, SERVER_TO_CLIENT); + addTrigger(STRESS_TEST_STATUS, CONTROL_CHANNEL, CLIENT_TO_SERVER); } /** - * Adds triggers and transforms to the LOAD_TEST_ROW_OUTGOING table + * Adds triggers and transforms to the STRESS_TEST_ROW_OUTGOING table */ - private void addLoadTestRowOutgoingConfig() { - addTrigger(LOAD_TEST_ROW_OUTGOING, SYNC_CHANNEL, SERVER_TO_CLIENT); + private void addStressTestRowOutgoingConfig() { + addTrigger(STRESS_TEST_ROW_OUTGOING, SYNC_CHANNEL, SERVER_TO_CLIENT); addOutgoingTransform(); - engine.syncTriggers(); } /** - * Adds triggers and transforms to the LOAD_TEST_ROW_INCOMING table + * Adds triggers and transforms to the STRESS_TEST_ROW_INCOMING table */ - private void addLoadTestRowIncomingConfig() { - addTrigger(LOAD_TEST_ROW_INCOMING, SYNC_CHANNEL, SERVER_TO_CLIENT); - addTrigger(LOAD_TEST_ROW_INCOMING, SYNC_CHANNEL, CLIENT_TO_SERVER); + private void addStressTestRowIncomingConfig() { + addTrigger(STRESS_TEST_ROW_INCOMING, SYNC_CHANNEL, SERVER_TO_CLIENT); + addTrigger(STRESS_TEST_ROW_INCOMING, SYNC_CHANNEL, CLIENT_TO_SERVER); addIncomingTransform(); - engine.syncTriggers(); } /** @@ -313,20 +335,19 @@ private void addTrigger(String table, String channel, String routerId) { if (router != null) { triggerRouter = new TriggerRouter(trigger, router); engine.getTriggerRouterService().saveTriggerRouter(triggerRouter); - engine.getTriggerRouterService().syncTriggers(); } } } /** - * Adds transforms to the LOAD_TEST_ROW_OUTGOING table + * Adds transforms to the STRESS_TEST_ROW_OUTGOING table */ private void addOutgoingTransform() { TransformTableNodeGroupLink outgoingTransform = new TransformTableNodeGroupLink(); outgoingTransform.setTransformPoint(TransformPoint.LOAD); - outgoingTransform.setTransformId("load_test_outgoing_transform"); - outgoingTransform.setSourceTableName(LOAD_TEST_ROW_OUTGOING); - outgoingTransform.setTargetTableName(LOAD_TEST_ROW_OUTGOING); + outgoingTransform.setTransformId("stress_test_outgoing_transform"); + outgoingTransform.setSourceTableName(STRESS_TEST_ROW_OUTGOING); + outgoingTransform.setTargetTableName(STRESS_TEST_ROW_OUTGOING); outgoingTransform.setColumnPolicy(ColumnPolicy.IMPLIED); outgoingTransform.setNodeGroupLink(engine.getConfigurationService().getNodeGroupLinkFor(SERVER_NODE_GROUP, CLIENT_NODE_GROUP, true)); @@ -341,14 +362,14 @@ private void addOutgoingTransform() { } /** - * Adds transforms to the LOAD_TEST_ROW_INCOMING table + * Adds transforms to the STRESS_TEST_ROW_INCOMING table */ private void addIncomingTransform() { TransformTableNodeGroupLink incomingTransform = new TransformTableNodeGroupLink(); incomingTransform.setTransformPoint(TransformPoint.LOAD); - incomingTransform.setTransformId("load_test_incoming_transform"); - incomingTransform.setSourceTableName(LOAD_TEST_ROW_INCOMING); - incomingTransform.setTargetTableName(LOAD_TEST_ROW_INCOMING); + incomingTransform.setTransformId("stress_test_incoming_transform"); + incomingTransform.setSourceTableName(STRESS_TEST_ROW_INCOMING); + incomingTransform.setTargetTableName(STRESS_TEST_ROW_INCOMING); incomingTransform.setColumnPolicy(ColumnPolicy.IMPLIED); incomingTransform.setNodeGroupLink(engine.getConfigurationService().getNodeGroupLinkFor(CLIENT_NODE_GROUP, SERVER_NODE_GROUP, true)); @@ -363,12 +384,12 @@ private void addIncomingTransform() { } /** - * Checks the LOAD_TEST_CONTROL table for new load test runs and starts a - * load test accordingly + * Checks the STRESS_TEST_CONTROL table for new stress test runs and starts + * a stress test accordingly */ private void runTest(final Node me) { if (!initialized) { - initLoadTestControl(); + initStressTest(); } List runs = engine.getSqlTemplate().query(selectControlSql); @@ -382,8 +403,8 @@ private void runTest(final Node me) { String status = engine.getSqlTemplate().queryForString(selectStatusSql, runId, me.getNodeId()); if (isMasterNode(me) && StringUtils.isBlank(status)) { - initLoadRowOutgoing(payloadColumns, runId, initialSeedSize); - initLoadRowIncoming(payloadColumns); + initStressTestRowOutgoing(payloadColumns, runId, initialSeedSize); + initStressTestRowIncoming(payloadColumns); long commitRows = run.getLong("SERVER_COMMIT_ROWS"); long sleepMs = run.getLong("SERVER_COMMIT_SLEEP_MS"); @@ -392,7 +413,7 @@ private void runTest(final Node me) { engine.getSqlTemplate().update(insertStatusSql, runId, client.getNodeId(), "RUNNING"); } - fillOutgoing(runId, duration, commitRows, sleepMs); + fillOutgoing(runId, duration, commitRows, sleepMs, payloadColumns); engine.getSqlTemplate().update(updateStatusSql, "COMPLETE", runId, me.getNodeId()); @@ -400,7 +421,7 @@ private void runTest(final Node me) { long commitRows = run.getLong("CLIENT_COMMIT_ROWS"); long sleepMs = run.getLong("CLIENT_COMMIT_SLEEP_MS"); - fillIncoming(runId, duration, commitRows, sleepMs); + fillIncoming(runId, duration, commitRows, sleepMs, payloadColumns); engine.getSqlTemplate().update(updateStatusSql, "COMPLETE", runId, me.getNodeId()); } @@ -412,13 +433,13 @@ private boolean isMasterNode(Node me) { } /** - * Fills the LOAD_TEST_OUTGOING table with test data + * Fills the STRESS_TEST_OUTGOING table with test data */ - private void seedOutgoing(int runId, long rows) { + private void seedOutgoing(int runId, long rows, int payloadColumns) { long currentRows = outgoingCounts.containsKey(runId) ? outgoingCounts.get(runId) : 0; - + String sql = buildInsertSql(STRESS_TEST_ROW_OUTGOING, payloadColumns); while (currentRows < rows) { - insertOutgoing(currentRows, runId); + insert(sql, currentRows, runId, payloadColumns); currentRows++; } @@ -426,16 +447,17 @@ private void seedOutgoing(int runId, long rows) { } /** - * Fills the LOAD_TEST_OUTGOING table with test data + * Fills the STRESS_TEST_OUTGOING table with test data */ - private void fillOutgoing(int runId, long duration, long commitRows, long sleepMs) { + private void fillOutgoing(int runId, long duration, long commitRows, long sleepMs, int payloadColumns) { long currentRows = outgoingCounts.containsKey(runId) ? outgoingCounts.get(runId) : 0; long startTime = System.currentTimeMillis(); long durationMs = duration * 60000; + String sql = buildInsertSql(STRESS_TEST_ROW_OUTGOING, payloadColumns); while (System.currentTimeMillis() - startTime < durationMs) { for (long commitRow = 0; commitRow < commitRows; commitRow++) { - insertOutgoing(currentRows + commitRow, runId); + insert(sql, currentRows + commitRow, runId, payloadColumns); } currentRows += commitRows; AppUtils.sleep(sleepMs); @@ -445,16 +467,17 @@ private void fillOutgoing(int runId, long duration, long commitRows, long sleepM } /** - * Fills the LOAD_TEST_INCOMING table with test data + * Fills the STRESS_TEST_INCOMING table with test data */ - private void fillIncoming(int runId, long duration, long commitRows, long sleepMs) { + private void fillIncoming(int runId, long duration, long commitRows, long sleepMs, int payloadColumns) { long currentRows = 0; long startTime = System.currentTimeMillis(); long durationMs = duration * 60000; + String sql = buildInsertSql(STRESS_TEST_ROW_INCOMING, payloadColumns); while (System.currentTimeMillis() - startTime < durationMs) { for (long commitRow = 0; commitRow < commitRows; commitRow++) { - insertIncoming(currentRows + commitRow, runId); + insert(sql, currentRows + commitRow, runId, payloadColumns); } currentRows += commitRows; AppUtils.sleep(sleepMs); @@ -462,19 +485,37 @@ private void fillIncoming(int runId, long duration, long commitRows, long sleepM } /** - * Inserts test data into LOAD_TEST_OUTGOING + * Inserts test data into STRESS_TEST_OUTGOING or STRESS_TEST_INCOMING */ - private void insertOutgoing(long rowId, int runId) { + private void insert(String sql, long rowId, int runId, int payloadColumns) { String nodeId = engine.getEngineName(); - engine.getSqlTemplate().update(insertOutgoingSql, rowId, nodeId, runId); + + Object[] values = new Object[payloadColumns + 3]; + values[0] = rowId; + values[1] = nodeId; + values[2] = runId; + + for (int c = 3; c < values.length; c++) { + values[c] = RandomStringUtils.randomAlphanumeric(100); + } + + engine.getSqlTemplate().update(sql, values); } - /** - * Inserts test data into LOAD_TEST_INCOMING - */ - private void insertIncoming(long rowId, int runId) { - String nodeId = engine.getEngineName(); - engine.getSqlTemplate().update(insertIncomingSql, rowId, nodeId, runId); + private String buildInsertSql(String table, int payloadColumns) { + StringBuilder sql = new StringBuilder("insert into " + table); + StringBuilder columns = new StringBuilder(" (row_id,node_id,run_id,insert_time"); + StringBuilder values = new StringBuilder(" values (?,?,?,current_timestamp"); + + List payloadColumnList = getPayloadColumns(payloadColumns); + for (Column column : payloadColumnList) { + columns.append(",").append(column.getName().toLowerCase()); + values.append(",?"); + } + columns.append(")"); + values.append(")"); + + return sql.append(columns).append(values).toString(); } @Override