diff --git a/PgBulkInsert/pom.xml b/PgBulkInsert/pom.xml index 3c8c341..eebfc3d 100644 --- a/PgBulkInsert/pom.xml +++ b/PgBulkInsert/pom.xml @@ -53,7 +53,7 @@ UTF-8 - 9.4-1206-jdbc42 + 42.2.2 @@ -157,4 +157,4 @@ - \ No newline at end of file + diff --git a/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/PgBulkInsert.java b/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/PgBulkInsert.java index c5389d1..c538ab9 100644 --- a/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/PgBulkInsert.java +++ b/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/PgBulkInsert.java @@ -20,7 +20,6 @@ import org.postgresql.PGConnection; import org.postgresql.copy.CopyIn; import org.postgresql.copy.CopyManager; -import org.postgresql.copy.PGCopyOutputStream; import java.math.BigDecimal; import java.net.Inet4Address; @@ -33,23 +32,33 @@ import java.util.stream.Stream; public abstract class PgBulkInsert implements IPgBulkInsert { + + private static final int DEFAULT_BUFFER_SIZE = 65536; private IValueHandlerProvider provider; private TableDefinition table; private List> columns; + + private final int bufferSize; public PgBulkInsert(String schemaName, String tableName) { - this(new ValueHandlerProvider(), schemaName, tableName); + this(new ValueHandlerProvider(), schemaName, tableName, DEFAULT_BUFFER_SIZE); + } + + public PgBulkInsert(String schemaName, String tableName, int bufferSize) + { + this(new ValueHandlerProvider(), schemaName, tableName, bufferSize); } - public PgBulkInsert(IValueHandlerProvider provider, String schemaName, String tableName) + public PgBulkInsert(IValueHandlerProvider provider, String schemaName, String tableName, int bufferSize) { this.provider = provider; this.table = new TableDefinition(schemaName, tableName); this.columns = new ArrayList<>(); + this.bufferSize = bufferSize; } public void saveAll(PGConnection connection, Stream entities) throws SQLException { @@ -57,10 +66,10 @@ public void saveAll(PGConnection connection, Stream entities) throws SQ CopyManager cpManager = connection.getCopyAPI(); CopyIn copyIn = cpManager.copyIn(getCopyCommand()); - try (PgBinaryWriter bw = new PgBinaryWriter()) { + try (PgBinaryWriter bw = new PgBinaryWriter(bufferSize)) { // Wrap the CopyOutputStream in our own Writer: - bw.open(new PGCopyOutputStream(copyIn)); + bw.open(copyIn); // Insert Each Column: entities.forEach(entity -> this.saveEntity(bw, entity)); diff --git a/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/pgsql/PgBinaryWriter.java b/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/pgsql/PgBinaryWriter.java index 4649972..fa4501c 100644 --- a/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/pgsql/PgBinaryWriter.java +++ b/PgBulkInsert/src/main/java/de/bytefish/pgbulkinsert/pgsql/PgBinaryWriter.java @@ -8,17 +8,22 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; -import java.io.OutputStream; + +import org.postgresql.copy.CopyIn; +import org.postgresql.copy.PGCopyOutputStream; public class PgBinaryWriter implements AutoCloseable { private transient DataOutputStream buffer; + + private final int bufferSize; - public PgBinaryWriter() { + public PgBinaryWriter(int bufferSize) { + this.bufferSize = bufferSize; } - public void open(final OutputStream out) { - buffer = new DataOutputStream(new BufferedOutputStream(out)); + public void open(final CopyIn copyIn) { + buffer = new DataOutputStream(new BufferedOutputStream(new PGCopyOutputStream(copyIn, 1), bufferSize)); writeHeader(); }