Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@
package de.bytefish.pgbulkinsert.bulkprocessor;

import de.bytefish.pgbulkinsert.bulkprocessor.handler.IBulkWriteHandler;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BulkProcessor<TEntity> implements AutoCloseable {

@Nullable
private final ScheduledThreadPoolExecutor scheduler;
@Nullable
private final ScheduledFuture<?> scheduledFuture;

private volatile boolean closed = false;
Expand All @@ -26,7 +33,7 @@ public BulkProcessor(IBulkWriteHandler<TEntity> handler, int bulkSize) {
this(handler, bulkSize, null);
}

public BulkProcessor(IBulkWriteHandler<TEntity> handler, int bulkSize, Duration flushInterval) {
public BulkProcessor(IBulkWriteHandler<TEntity> handler, int bulkSize, @Nullable Duration flushInterval) {

this.handler = handler;
this.bulkSize = bulkSize;
Expand Down Expand Up @@ -61,10 +68,8 @@ public void close() throws Exception {
closed = true;

// Quit the Scheduled FlushInterval Future:
if (this.scheduledFuture != null) {
cancel(this.scheduledFuture);
this.scheduler.shutdown();
}
Optional.ofNullable(this.scheduledFuture).ifPresent(future -> future.cancel(false));
Optional.ofNullable(this.scheduler).ifPresent(ScheduledThreadPoolExecutor::shutdown);

// Are there any entities left to write?
if (batchedEntities.size() > 0) {
Expand Down Expand Up @@ -96,13 +101,6 @@ private void write(List<TEntity> entities) {
}
}

public static boolean cancel(Future<?> future) {
if (future != null) {
return future.cancel(false);
}
return false;
}

class Flush implements Runnable {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

package de.bytefish.pgbulkinsert.bulkprocessor.handler;

import de.bytefish.pgbulkinsert.IPgBulkInsert;
import de.bytefish.pgbulkinsert.util.PostgreSqlUtils;
import org.postgresql.PGConnection;

import java.sql.Connection;
import java.util.List;
import java.util.function.Supplier;

import org.postgresql.PGConnection;

import de.bytefish.pgbulkinsert.IPgBulkInsert;
import de.bytefish.pgbulkinsert.util.PostgreSqlUtils;

public class BulkWriteHandler<TEntity> implements IBulkWriteHandler<TEntity> {

private final IPgBulkInsert<TEntity> client;
Expand All @@ -22,6 +21,7 @@ public BulkWriteHandler(IPgBulkInsert<TEntity> client, Supplier<Connection> conn
this.connectionFactory = connectionFactory;
}

@Override
public void write(List<TEntity> entities) throws Exception {
// Obtain a new Connection and execute it in a try with resources block, so it gets closed properly:
try(Connection connection = connectionFactory.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected void onSetUpInTransaction() throws Exception {
}

// Define a Custom Handler for the Unit Test, which does not close the Connection:
class CustomBulkWriteHandler<TEntity> implements IBulkWriteHandler<TEntity> {
static class CustomBulkWriteHandler<TEntity> implements IBulkWriteHandler<TEntity> {

private final IPgBulkInsert<TEntity> client;
private final Supplier<Connection> connectionFactory;
Expand All @@ -41,6 +41,7 @@ public CustomBulkWriteHandler(IPgBulkInsert<TEntity> client, Supplier<Connection
this.connectionFactory = connectionFactory;
}

@Override
public void write(List<TEntity> entities) throws Exception {
Connection connection = connectionFactory.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Stream;

public class PgBulkInsert<TEntity> implements IPgBulkInsert<TEntity> {
Expand All @@ -34,28 +33,17 @@ public PgBulkInsert(IConfiguration configuration, AbstractMapping<TEntity> mappi
this.mapping = mapping;
}

@Override
Comment thread
jonfreedman marked this conversation as resolved.
public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {

try (PgBinaryWriter bw = new PgBinaryWriter(configuration.getBufferSize())) {

// Wrap the CopyOutputStream in our own Writer:
bw.open(new PGCopyOutputStream(connection, mapping.getCopyCommand(), 1));

// Wrap the CopyOutputStream in our own Writer:
try (PgBinaryWriter bw = new PgBinaryWriter(new PGCopyOutputStream(connection, mapping.getCopyCommand(), 1), configuration.getBufferSize())) {
// Insert Each Column:
entities.forEach(entity -> saveEntitySynchonized(bw, entity));
}
}

public void saveAll(PGConnection connection, Collection<TEntity> entities) throws SQLException {

try (PgBinaryWriter bw = new PgBinaryWriter(configuration.getBufferSize())) {

// Wrap the CopyOutputStream in our own Writer:
bw.open(new PGCopyOutputStream(connection, mapping.getCopyCommand(), 1));

// Insert Each Column:
entities.forEach(entity -> saveEntity(bw, entity));
}
saveAll(connection, entities.stream());
}

private void saveEntity(PgBinaryWriter bw, TEntity entity) throws SaveEntityFailedException {
Expand All @@ -71,7 +59,7 @@ private void saveEntity(PgBinaryWriter bw, TEntity entity) throws SaveEntityFail
throw new SaveEntityFailedException(e);
}
}

private void saveEntitySynchonized(PgBinaryWriter bw, TEntity entity) throws SaveEntityFailedException {
synchronized (bw) {
saveEntity(bw, entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
Expand All @@ -12,22 +13,15 @@

public class PgBinaryWriter implements AutoCloseable {

private transient DataOutputStream buffer;
private final transient DataOutputStream buffer;

private final int bufferSize;

public PgBinaryWriter() {
this(65536);
}

public PgBinaryWriter(int bufferSize) {
this.bufferSize = bufferSize;
public PgBinaryWriter(final OutputStream out) {
this(out, 65536);
}

public void open(final OutputStream out) {
buffer = new DataOutputStream(new BufferedOutputStream(out, bufferSize));

writeHeader();
public PgBinaryWriter(final OutputStream out, final int bufferSize) {
buffer = new DataOutputStream(new BufferedOutputStream(out, bufferSize));
writeHeader();
}

public void startRow(int numColumns) {
Expand All @@ -43,15 +37,15 @@ public void startRow(int numColumns) {
}
}

public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
public <TTargetType> void write(final IValueHandler<TTargetType> handler, @Nullable final TTargetType value) {
handler.handle(buffer, value);
}

/**
* Writes primitive boolean to the output stream
*
*
* @param value value to write
*
*
*/
public void writeBoolean(boolean value) {
try {
Expand All @@ -71,12 +65,12 @@ public void writeBoolean(boolean value) {
}
}


/**
* Writes primitive byte to the output stream
*
*
* @param value value to write
*
*
*/
public void writeByte(int value) {
try {
Expand All @@ -94,9 +88,9 @@ public void writeByte(int value) {

/**
* Writes primitive short to the output stream
*
*
* @param value value to write
*
*
*/
public void writeShort(int value) {
try {
Expand All @@ -114,9 +108,9 @@ public void writeShort(int value) {

/**
* Writes primitive integer to the output stream
*
*
* @param value value to write
*
*
*/
public void writeInt(int value) {
try {
Expand All @@ -134,9 +128,9 @@ public void writeInt(int value) {

/**
* Writes primitive long to the output stream
*
*
* @param value value to write
*
*
*/
public void writeLong(long value) {
try {
Expand All @@ -151,12 +145,12 @@ public void writeLong(long value) {
}
}
}

/**
* Writes primitive float to the output stream
*
*
* @param value value to write
*
*
*/
public void writeFloat(float value) {
try {
Expand All @@ -174,9 +168,9 @@ public void writeFloat(float value) {

/**
* Writes primitive double to the output stream
*
*
* @param value value to write
*
*
*/
public void writeDouble(double value) {
try {
Expand Down Expand Up @@ -224,7 +218,7 @@ public void close() {
}
}
}

private void writeHeader() {
try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
package de.bytefish.pgbulkinsert.pgsql.handlers;

import de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.DataOutputStream;

public abstract class BaseValueHandler<T> implements IValueHandler<T> {

@Override
public void handle(DataOutputStream buffer, final T value) {
public void handle(DataOutputStream buffer, @Nullable final T value) {
try {
if (value == null) {
buffer.writeInt(-1);
Expand Down
Loading