From 40814842e299c0f37f6f935f38051d108078a5cc Mon Sep 17 00:00:00 2001 From: Fernando Velasquez Date: Wed, 1 Dec 2021 13:31:41 -0500 Subject: [PATCH] Added logic to ensure batches are submitted to the engine every 1k records by default. This is configurable using a pipeline argument. This prevents the workers from running out of memory for large partitions. --- .../cdap/plugin/db/batch/sink/AbstractDBSink.java | 15 +++++++++++++-- .../plugin/db/batch/sink/ETLDBOutputFormat.java | 12 ++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java index f042d5ffd..dd448b24a 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java @@ -47,6 +47,7 @@ import io.cdap.plugin.db.batch.config.DatabaseSinkConfig; import io.cdap.plugin.util.DBUtils; import io.cdap.plugin.util.DriverCleanup; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.slf4j.Logger; @@ -158,8 +159,18 @@ public void prepareRun(BatchSinkContext context) { configAccessor.setTransactionIsolationLevel(dbSinkConfig.getTransactionIsolationLevel()); } - context.addOutput(Output.of(dbSinkConfig.getReferenceName(), new SinkOutputFormatProvider(ETLDBOutputFormat.class, - configAccessor.getConfiguration()))); + // Get Hadoop configuration object + Configuration configuration = configAccessor.getConfiguration(); + + // Configure batch size if specified in pipeline arguments. + if (context.getArguments().has(ETLDBOutputFormat.COMMIT_BATCH_SIZE)) { + configuration.set(ETLDBOutputFormat.COMMIT_BATCH_SIZE, + context.getArguments().get(ETLDBOutputFormat.COMMIT_BATCH_SIZE)); + } + + context.addOutput(Output.of(dbSinkConfig.getReferenceName(), + new SinkOutputFormatProvider(ETLDBOutputFormat.class, + configuration))); } /** diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/ETLDBOutputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/ETLDBOutputFormat.java index 5779b8cb4..eb06e99cf 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/ETLDBOutputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/ETLDBOutputFormat.java @@ -50,6 +50,9 @@ * @param - Value passed to this class to be written. The value is ignored. */ public class ETLDBOutputFormat extends DBOutputFormat { + // Batch size before submitting a batch to the SQL engine. If set to 0, no batches will be submitted until commit. + public static final String COMMIT_BATCH_SIZE = "io.cdap.plugin.db.output.commit.batch.size"; + public static final int DEFAULT_COMMIT_BATCH_SIZE = 1000; private static final Logger LOG = LoggerFactory.getLogger(ETLDBOutputFormat.class); @@ -63,6 +66,7 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOE DBConfiguration dbConf = new DBConfiguration(conf); String tableName = dbConf.getOutputTableName(); String[] fieldNames = dbConf.getOutputFieldNames(); + final int batchSize = conf.getInt(COMMIT_BATCH_SIZE, DEFAULT_COMMIT_BATCH_SIZE); if (fieldNames == null) { fieldNames = new String[dbConf.getOutputFieldCount()]; @@ -74,6 +78,7 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOE return new DBRecordWriter(connection, statement) { private boolean emptyData = true; + private long numWrittenRecords = 0; //Implementation of the close method below is the exact implementation in DBOutputFormat except that //we check if there is any data to be written and if not, we skip executeBatch call. @@ -116,6 +121,13 @@ public void write(K key, V value) { try { key.write(getStatement()); getStatement().addBatch(); + numWrittenRecords++; + + // Submit a batch to the SQL engine every 10k records + // This is done to reduce memory usage in the worker, as processed records can now be GC'd. + if (batchSize > 0 && numWrittenRecords % batchSize == 0) { + getStatement().executeBatch(); + } } catch (SQLException e) { LOG.warn("Failed to write value to database", e); }