Skip to content

Expose some private methods as public#19

Closed
The-Alchemist wants to merge 1 commit intobytefish:masterfrom
The-Alchemist:patch-2
Closed

Expose some private methods as public#19
The-Alchemist wants to merge 1 commit intobytefish:masterfrom
The-Alchemist:patch-2

Conversation

@The-Alchemist
Copy link
Copy Markdown

Many APIs only give you access to one entity at a time, even if there is some kind of stream backing it (e.g., JDBC, Kafka).

So, I was hoping to inline and modify the PgBulkInsert::saveAll() method and call the following directly:

        CopyManager cpManager = connection.getCopyAPI();
        CopyIn copyIn = cpManager.copyIn(getCopyCommand());

        try (PgBinaryWriter bw = new PgBinaryWriter()) {

            // Wrap the CopyOutputStream in our own Writer:
            bw.open(new PGCopyOutputStream(copyIn));

            // Insert Each Column:
            entities.forEach(entity -> this.saveEntity(bw, entity));
        }

However, two methods are private and can't be called directly.

Exposing them would allow users to control the opening and closing of a PgBinaryWriter, allowing usage with other APIs that provide messages one-by-one.

Many APIs only give you access to one entity at a time, even if there is some kind of stream backing it (e.g., JDBC, Kafka).

So, I was hoping to inline and modify the `PgBulkInsert::saveAll()` method and call the following directly:

```java
        CopyManager cpManager = connection.getCopyAPI();
        CopyIn copyIn = cpManager.copyIn(getCopyCommand());

        try (PgBinaryWriter bw = new PgBinaryWriter()) {

            // Wrap the CopyOutputStream in our own Writer:
            bw.open(new PGCopyOutputStream(copyIn));

            // Insert Each Column:
            entities.forEach(entity -> this.saveEntity(bw, entity));
        }
```

However, two methods are private and can't be called directly.

Exposing them would allow users to control the opening and closing of a `PgBinaryWriter`, allowing usage with other APIs that provide messages one-by-one.
@bytefish
Copy link
Copy Markdown
Owner

I don't think it would be the right abstraction. Did you take a look at the BulkProcessor: https://bytefish.de/blog/pgbulkinsert_bulkprocessor/?

@The-Alchemist
Copy link
Copy Markdown
Author

@bytefish : Good call, I didn't know about BulkProcessor.

@bytefish
Copy link
Copy Markdown
Owner

@The-Alchemist Hey, I didn't say it is useful in your use case. 😁 If it doesn't fit your needs, just let me know and maybe we can come up with something. 😎

@The-Alchemist The-Alchemist reopened this May 17, 2018
@The-Alchemist
Copy link
Copy Markdown
Author

Actually, yes, @bytefish , not that I looked it, BulkProcessor stores all the entities in an ArrayList and closes the DB connection, two things we're trying to avoid.

It seem neither PgBulkInsert nor BulkProcessor allow sending in entities one-by-one, unless I missed an API, again.

@bytefish
Copy link
Copy Markdown
Owner

You are right. I think you want to open a connection and stream the data into the Postgres database, right? That's basically what saveAll does. Maybe an existing API needs to be wrapped to return a Stream. But I know Java 8 Streams and guess, that is close to impossible.

I will add a method, that suits your needs, but I won't be able before the weekend.

@The-Alchemist
Copy link
Copy Markdown
Author

Thank you, @bytefish ! For now, I've resorted to using reflection to saveEntity() manually. Yes, I do my own connection-management, and I just want to stream data to a Postgres db.

I'm not sure if it's just wrapping and exposing a Stream. How would one implement use https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/MessageListener.html and PgBulkInsert together?

@bytefish
Copy link
Copy Markdown
Owner

@The-Alchemist You are totally right. I did a quick research, it's possible to turn it into a Stream, but it is complicated. I will adjust the API and come up with something. 🤔 And if nothing works, I will just make the methods public. 😎

@The-Alchemist
Copy link
Copy Markdown
Author

@bytefish : thank you, you're awesome!

bytefish added a commit that referenced this pull request May 19, 2018
Initial Refactoring to decouple Column Mapping from actual Postgres
Connection handling. Increasing Version Number to 2.0 due to breaking
API changes. Refactoring Tests.
@bytefish
Copy link
Copy Markdown
Owner

bytefish commented May 19, 2018

@The-Alchemist I first decoupled the actual column mapping and the insertion methods. That leaves us with just two tiny methods in the class in question. I am now thinking about how to go further.

I am thinking about the following: PgBulkInsert implements an AutoClosable. You pass a connection into the saveAll method. I will add an additional save method for a single entity. This enforces the user to close the PgConnection by himself, just like you have to do in standard JDBC.

The only nitpick with this kind of refactoring would be...

I wrote this library for Apache Flink (https://flink.apache.org/) experiments I did. In there you have a Stream of incoming events, that I want to bulk write to the database. And I don't want to create a PgBulkInsert class for every Batch I am going to write.

But actually... You could cache the most expensive part (the Column Mapping creation) and the PgBulkInsert class will be a tiny wrapper. I don't know if it is expensive to instantiate it for every Batch (I don't think so).

What do you think?

@bytefish
Copy link
Copy Markdown
Owner

I think it could look something like this:

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert;

import de.bytefish.pgbulkinsert.exceptions.SaveEntityFailedException;
import de.bytefish.pgbulkinsert.mapping.AbstractMapping;
import de.bytefish.pgbulkinsert.pgsql.PgBinaryWriter;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyIn;
import org.postgresql.copy.CopyManager;
import org.postgresql.copy.PGCopyOutputStream;

import java.sql.SQLException;
import java.util.stream.Stream;

public class BulkWriter<TEntity> implements IBulkWriter<TEntity> {

    private final CopyIn copyIn;
    private final PgBinaryWriter writer;
    private final AbstractMapping<TEntity> mapping;

    public BulkWriter(CopyIn copyIn, PgBinaryWriter writer, AbstractMapping mapping)
    {
        this.copyIn = copyIn;
        this.writer = writer;
        this.mapping = mapping;

        open();
    }

    private void open() {
        writer.open(new PGCopyOutputStream(copyIn));
    }

    @Override
    public void save(TEntity entity) throws SQLException {
        saveEntity(writer, entity);
    }

    @Override
    public void saveAll(Stream<TEntity> entities) throws SQLException {
            entities.forEach(entity -> this.saveEntity(writer, entity));
    }

    private void saveEntity(PgBinaryWriter bw, TEntity entity) throws SaveEntityFailedException {
        synchronized (bw) {
            // Start a New Row:
            bw.startRow(mapping.getColumns().size());

            // Iterate over each column mapping:
            mapping.getColumns().forEach(column -> {
                try {
                    column.getWrite().invoke(bw, entity);
                } catch (Exception e) {
                    throw new SaveEntityFailedException(e);
                }
            });
        }
    }

    public static <TEntity> BulkWriter<TEntity> create(PGConnection connection, AbstractMapping mapping) throws SQLException {
        // Get the Copy API from the PgConnection:
        CopyManager copyManager = connection.getCopyAPI();
        // Create the CopyIn:
        CopyIn copyIn = copyManager.copyIn(mapping.getCopyCommand());
        // Create the Binary Writer:
        PgBinaryWriter writer = new PgBinaryWriter();
        // And return the PgBulkInsert:
        return new BulkWriter<>(copyIn, writer, mapping);
    }

    @Override
    public void close() throws Exception {
        if(writer != null) {
            writer.close();
        }
    }
}

@bytefish
Copy link
Copy Markdown
Owner

bytefish commented May 19, 2018

@The-Alchemist I am not happy with the above, because I particularly don't like the open() in the constructor. And the create(...) method throws a checked exception. I wouldn't want to write try-with-resources blocks like this:

// Cast to the underlying PGConnection:
PGConnection pgConnection = PostgreSqlUtils.getPGConnection(connection);
// The Mapping to use:
AbstractMapping<GeometricEntity> mapping = new GeometricEntityMapping();
// Construct the Bulk Writer:
try(BulkWriter<GeometricEntity> bulkInsert = BulkWriter.create(pgConnection, )) {
    // Save the entities:
    bulkInsert.saveAll(entities.stream());
} catch (Exception e) {
    // Pokemon Exception Handling!
}

Of course I can wrap the SQLException in a RuntimeException, but I don't think this would be appropriate in Java. Checked Exceptions make Java APIs so ugly...

What do you think?

@bytefish
Copy link
Copy Markdown
Owner

bytefish commented May 19, 2018

@The-Alchemist But when designing the API this way, you have to know, that nothing gets written until you call close on the BulkWriter. I don't like this kind of API design, because it leads to people writing issues like: "I am calling save(...), but nothing gets written to Postgres." or even worse it crashes on the Postgres-side.

And I have been bitten by AutoClosable APIs sometimes myself.

So the existing API design is easier to use.

@bytefish
Copy link
Copy Markdown
Owner

bytefish commented May 19, 2018

@The-Alchemist What I would suggest for you use case is the following.

So what I suggest is, that you create an implementation, that fits your use case best and you have full control over the API design. I am thinking of something like the following TheAlchemistBulkInsert<TEntity> class. You can instantiate the writer either with the static create class (from an existing connection) or by instantiating the class with the constructor.

That would make both of us happy. 👍

class TheAlchemistBulkInsert<TEntity> implements AutoCloseable {

    private final CopyIn copyIn;
    private final PgBinaryWriter writer;
    private final AbstractMapping<TEntity> mapping;

    public TheAlchemistBulkInsert(CopyIn copyIn, PgBinaryWriter writer, AbstractMapping mapping)
    {
        this.copyIn = copyIn;
        this.writer = writer;
        this.mapping = mapping;

        open();
    }

    private void open() {
        writer.open(new PGCopyOutputStream(copyIn));
    }

    public void save(TEntity entity) throws SQLException {
        saveEntity(writer, entity);
    }

    public void saveAll(Stream<TEntity> entities) throws SQLException {
        entities.forEach(entity -> this.saveEntity(writer, entity));
    }

    private void saveEntity(PgBinaryWriter bw, TEntity entity) throws SaveEntityFailedException {
        synchronized (bw) {
            // Start a New Row:
            bw.startRow(mapping.getColumns().size());

            // Iterate over each column mapping:
            mapping.getColumns().forEach(column -> {
                try {
                    column.getWrite().invoke(bw, entity);
                } catch (Exception e) {
                    throw new SaveEntityFailedException(e);
                }
            });
        }
    }

    public static <TEntity> TheAlchemistBulkInsert<TEntity> create(PGConnection connection, AbstractMapping mapping) throws SQLException {
        // Get the Copy API from the PgConnection:
        CopyManager copyManager = connection.getCopyAPI();
        // Create the CopyIn:
        CopyIn copyIn = copyManager.copyIn(mapping.getCopyCommand());
        // Create the Binary Writer:
        PgBinaryWriter writer = new PgBinaryWriter();
        // And return the PgBulkInsert:
        return new TheAlchemistBulkInsert<>(copyIn, writer, mapping);
    }

    @Override
    public void close() throws Exception {
        if(writer != null) {
            writer.close();
        }
    }
}

@bytefish
Copy link
Copy Markdown
Owner

All Unit Tests passed. I have released Version 2.0 with the major breaking change. You should now have the possibility to utilize the mappings as shown in the sample above.

@bytefish bytefish closed this May 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants