Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication. #4065

Closed
wants to merge 16 commits into from

Conversation

gerdansantos
Copy link

@gerdansantos gerdansantos commented Feb 20, 2020

NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

Signed-off-by: Davy Machado machado.davy@gmail.com
Signed-off-by: Gerdan Santos gerdan@gmail.com

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

Inspired on CaptureChangeMySQL, this processor can use PostgreSQL Streaming Replication Connection to allow access to their transactional logs and such, in order for external clients to have a "change data capture" (CDC) capability.

The processor don't accept incoming connections, must be run on the primary node as a single threaded processor, and generate flow files for operations (INSERT, UPDATE, DELETE) in JSON format.

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on both JDK 8 and JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you ensure that mvn clean install -Pcontrib-check completes successfully? After a very quick look, it looks like Apache headers are missing in most classes.

…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

Applying correction of run mvn clean install -Pcontrib-check

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
@davyam
Copy link

davyam commented Feb 20, 2020

Thank you @pvillard31 for your feedback!

We fix the style problem and the mvn clean install -Pcontrib-check completed successfully.

Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR and the quick changes. There is one file that should not be required. Also, some part of your codes were on multiple lines which, I agree, is cleaner. The complaint was probably about tab being used instead of spaces. You can revert to multiline if you want. Example:

pgcon.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName(this.slot)
    .withOutputPlugin("pgoutput")
    .make();

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should not be required. Could you remove it? Or tell us what it'd be required?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file was removed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) solved

…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

removed .checkstyle .

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
@gerdansantos
Copy link
Author

Hello Mr. @pvillard31. When CI/CD phase occur erros by others codes, i will need do something ?

@pvillard31
Copy link
Contributor

No, as long as one of the Java 8 builds is working it should be good. The Travis builds are not reliable at the moment and we're working on this.

…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

code style fixes

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

more code style fixes

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
Copy link
Contributor

@mattyb149 mattyb149 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a great new feature! I left some initial comments, and I haven't been able to get it to work yet on my PostgreSQL 12 server. Is there some n00b tutorial you can point me to so I can make sure I set everything up right? I created a publication and a replication slot, but I don't know how/if they work together, and I didn't get any events into the processor.

</dependency>
<!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, all other components in the NiFi source code base use Jackson rather than JSON.simple. Not sure it's a requirement to change but I guess a suggestion for consistency (and Jackson has a slight edge in overall performance).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on this...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! JSON Simple replaced by Jackson.


public class ConnectionManager {

private static String server;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking the members and methods on this class should not be static; particularly, what if there were two instances of the processor on the canvas, each talking to different servers and/or databases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on this...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Many Alterations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Many Alterations

@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({ "sql", "jdbc", "cdc", "postgresql" })
@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say that it works for PostgreSQL version 10+ to avoid confusion for folks running older versions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. CDC Events include INSERT, UPDATE, DELETE operations. Events "
        + "are output as individual flow files ordered by the time at which the operation occurred. This processor use a replication connection to stream data and sql connection to snapshot.")

* A processor to retrieve Change Data Capture (CDC) events and send them as flow files.
*/
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't anticipate the user wanting to run the same replication on each node in the cluster, this should probably have a @PrimaryNodeOnly annotation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly
.
.
@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@PrimaryNodeOnly

private Set<Relationship> relationships;

// Attribute keys
public static final String MIME_TYPE_ATTRIBUTE = "mime.type";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use CoreAttributes.MIME_TYPE.key() here to avoid another constant

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

+import org.apache.nifi.flowfile.attributes.CoreAttributes;
    // Attribute keys
-    public static final String MIME_TYPE_ATTRIBUTE = "mime.type";
-flowFile = session.putAttribute(flowFile, MIME_TYPE_ATTRIBUTE, MIME_TYPE_VALUE);
+flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_VALUE);


@Test
public void testProcessor() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should fill this in, even if you're using mock objects to simulate the "other side"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to do this ... I have no experience with groovy ... I will see on other processors as examples.


import java.util.LinkedList;

public class Event {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some javadoc here to explain what an "event" is in this context? From the code I couldn't quite tell if it's a single transaction or all available replication messages, etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on this...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments explaining Event, Stream and Snapshot classes.

public void createReplicationSlot() {
try {
PGConnection pgcon = ConnectionManager.getReplicationConnection().unwrap(PGConnection.class);
// More details about pgoutput options: https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might end up being a dead link someday, perhaps just mention the file name and the postgres repo on GitHub? Not that big a deal, just mentioning it in passing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mattyb149 we will work on your sugestions and corrections... Thank you so much for your time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might end up being a dead link someday, perhaps just mention the file name and the postgres repo on GitHub? Not that big a deal, just mentioning it in passing

done

            +//PostgreSQL at GitHub: https://github.com/postgres
            +//Source file: postgres/src/backend/replication/pgoutput/pgoutput.c

@gerdansantos
Copy link
Author

This is going to be a great new feature! I left some initial comments, and I haven't been able to get it to work yet on my PostgreSQL 12 server. Is there some n00b tutorial you can point me to so I can make sure I set everything up right? I created a publication and a replication slot, but I don't know how/if they work together, and I didn't get any events into the processor.

Configuration on PostgreSQL Server

postgresql.conf

Your database should be configured to enable logical replication.

  • Property max_wal_senders should be at least equal to the number of replication consumers.
  • Property wal_keep_segments should contain count wal segments that can't be removed from database.
  • Property wal_level for logical replication should be equal to logical.
  • Property max_replication_slots should be greater than zero for logical replication, because logical replication can't work without replication slot.

Example:

listen_addresses = '*'         	# what IP address(es) to listen on;
                                # comma-separated list of addresses;
                                # defaults to 'localhost'; use '*' for all
                                # (change requires restart)
max_wal_senders = 4             # max number of walsender processes
wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
wal_level = logical             # minimal, replica, or logical
max_replication_slots = 4       # max number of replication slots

After configurations, restart/reload PostgreSQL service.

pg_hba.conf

Enable connect user with replication privileges to replication stream and make a statments on PostgreSQL Server.

Example, Put this lines at pg_hba.conf:

host    all           all              all                     md5 #For SnapShot Connection
host    replication   all              all                     md5 #For Replication Connection

After configurations, restart PostgreSQL service.

Example of Use

PostgreSQL

Create a PostgreSQL table, for this example we use a postgres database; JUST FOR LEARNING PURPOSE;

Table struct:

                Table "public.cidade"
  Column   |  Type   | Collation | Nullable | Default
-----------+---------+-----------+----------+---------
 codigo    | integer |           | not null |
 data_fund | date    |           | not null |
 nome      | text    |           |          |
Publications:
    "cidade_pub"
CREATE TABLE cidade(codigo integer not null, data_fund date not null, nome text);
ALTER TABLE cidade REPLICA IDENTITY FULL;

OBS: A published table must have a “replica identity” configured in order to be able to replicate UPDATE and DELETE operations, so that appropriate rows to update or delete can be identified on the subscriber side. By default, this is the primary key, if there is one. Another unique index (with certain additional requirements) can also be set to be the replica identity. If the table does not have any suitable key, then it can be set to replica identity “full”, which means the entire row becomes the key. This, however, is very inefficient and should only be used as a fallback if no other solution is possible. More details: https://www.postgresql.org/docs/10/logical-replication-publication.html

First, you need a Publication for the tables that you want to capture data changes:

CREATE PUBLICATION cidade_pub FOR TABLE cidade;

You can decide the NiFi CDC PostgreSQL processor create a slot for you...
Or you can create manually.

SELECT * FROM pg_create_logical_replication_slot('slt_cidade_pub', 'pgoutput');

Link NiFi CDC PostgreSQL processor to this PostgreSQL Instance:
Example

PostgreSQL Host: 127.0.0.1
PostgreSQL Driver Class Name: org.postgresql.Driver
PostgreSQL Driver Location(s): /nifi-1.11.0/jdbc/postgresql-42.2.9.jar
Database: postgres
Username: postgres
Password:123123
Publication: cidade_pub
Slot Name: slt_cidade_pub
Make Snapshot: false
Include Begin/Commit Events: false
Initial Log Sequence Number - LSN: [empty property]
Drop If Exists Replication Slot: false

Link to LogAttribute for example, so start CaptureChangePostgreSQL

So execute at PostgreSQL Server

INSERT INTO cidade (codigo, data_fund, nome) VALUES (4, now(), 'New York');
UPDATE cidade SET codigo = 20 WHERE codigo = 4;
UPDATE cidade SET nome = 'Statue of Liberty City' WHERE nome = 'New York';
DELETE FROM cidade WHERE codigo >= 4;

Output at NiFi Queue:

{"insert":{"cidade":{"codigo":4,"nome":"New York","data_fund":"2020-02-24"}}}
{"update":{"cidade":{"codigo":20,"nome":"New York","data_fund":"2020-02-24"}}}
{"update":{"cidade":{"codigo":20,"nome":"Statue of Liberty City","data_fund":"2020-02-24"}}}
{"delete":{"cidade":{"codigo":20,"nome":"Statue of Liberty City","data_fund":"2020-02-24"}}}

@gerdansantos
Copy link
Author

Thanks for the PR and the quick changes. There is one file that should not be required. Also, some part of your codes were on multiple lines which, I agree, is cleaner. The complaint was probably about tab being used instead of spaces. You can revert to multiline if you want. Example:

pgcon.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName(this.slot)
    .withOutputPlugin("pgoutput")
    .make();

done

gerdansantos and others added 2 commits February 25, 2020 04:55
…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

Fixes

Information about PostgreSQL version supported version 10+ to avoid confusion for folks running older versions

Prevent run the same replication on each node in the cluster, added a @PrimaryNodeOnly annotation.

replace MIME_TYPE_ATTRIBUTE constant to CoreAttributes.MIME_TYPE.key()

added more information about pgoutput plugin source and uri.

Change ConnectionManager to Non Static

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
- JSON Simple replaced by Jackson;
- Added constants for default values;
- Added more comments to explain some parts of the code.
@davyam
Copy link

davyam commented Mar 2, 2020

Thanks for the PR and the quick changes. There is one file that should not be required. Also, some part of your codes were on multiple lines which, I agree, is cleaner. The complaint was probably about tab being used instead of spaces. You can revert to multiline if you want. Example:

pgcon.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName(this.slot)
    .withOutputPlugin("pgoutput")
    .make();

Done!

@davyam
Copy link

davyam commented Mar 2, 2020

With the replacement from JSON Simple to Jackson, the output (content) changed too.

Example:

{
  "tupleData" : {
    "codigo" : 4,
    "nome" : "CDC",
    "data_fund" : "1929-10-19"
  },
  "relationName" : "public.cidade",
  "type" : "insert"
}

…changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.

Applying correction of run mvn clean install -Pcontrib-check

Signed-off-by: Davy Machado <machado.davy@gmail.com>

Signed-off-by: Gerdan Santos <gerdan@gmail.com>
@gerdansantos
Copy link
Author

hi @mattyb149 can you review this PR again ?

@gerdansantos
Copy link
Author

@pvillard31 can you review this PR to ?

@gerdansantos
Copy link
Author

This is going to be a great new feature! I left some initial comments, and I haven't been able to get it to work yet on my PostgreSQL 12 server. Is there some n00b tutorial you can point me to so I can make sure I set everything up right? I created a publication and a replication slot, but I don't know how/if they work together, and I didn't get any events into the processor.

Made

@gerdansantos
Copy link
Author

@mattyb149 can you review this PR again?

@davyam
Copy link

davyam commented Mar 31, 2020

Hey guys,
All requested changes have been made, but the reviews remain pending for some time.
So, how can we help to ensure that this PR will be in the next NiFi version?
Best regards

@davyam
Copy link

davyam commented Apr 19, 2021

@mathiasbosman @driesva

Hey guys!

Why are you creating a new PR copying the code instead of work in collaboration with me and @gerdansantos to improve this?

As you say in the new PR, you are not Postgres experts, but we are. In the same way, I and @gerdansantos are not Java experts. So, we can work together to make a better processor. This is the community's spirit.

Let us know if you need anything.

@driesva
Copy link

driesva commented Apr 21, 2021

Hello @davyam

Well, we wanted to work together... That's why @mathiasbosman initially opened a PR against @gerdansantos fork, but it did not get merged. Given the original PR is not accessible to us, we could not push our changes to that PR and because there was no reaction, we created a new PR against main including the initial code and our improvements. Meanwhile that PR is also out-of-date...

@davyam
Copy link

davyam commented Apr 27, 2021

Hi @driesva

I got, sorry for the delay... @gerdansantos and I are very busy with our works.

In the next month, we gonna make an effort to update this PR... in the meanwhile, I will discuss with @gerdansantos how we can work together.

@github-actions
Copy link

We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable. If you would like this PR re-opened you can do so and a committer can remove the stale tag. Or you can open a new PR. Try to help review other PRs to increase PR review bandwidth which in turn helps yours.

@github-actions github-actions bot added the Stale label Aug 26, 2021
@rrjanbiah
Copy link

Bump! We gonna miss this big contribution

@pvillard31
Copy link
Contributor

Removing the stale tag for this one, but it needs to be updated to get merged as changes have been requested by reviewers.

@pvillard31 pvillard31 removed the Stale label Aug 26, 2021
@davyam
Copy link

davyam commented Aug 26, 2021

Hey guys, we will make the necessary changes next weekend.

@davyam
Copy link

davyam commented Aug 29, 2021

We did some changes and we still working for improve the tests, as requested by @mattyb149

davyam and others added 6 commits August 30, 2021 10:06
Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
…postgresql-nar/pom.xml

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
…postgresql-nar/pom.xml

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
…postgresql-processors/pom.xml

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
…postgresql-processors/pom.xml

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
@davyam
Copy link

davyam commented Aug 30, 2021

Thanks @pvillard31 for the update.

@gerdansantos
Copy link
Author

@pvillard31 and @mattyb149 , working on tests!!!

I believe until Sunday I will finish.

@github-actions
Copy link

We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable. If you would like this PR re-opened you can do so and a committer can remove the stale tag. Or you can open a new PR. Try to help review other PRs to increase PR review bandwidth which in turn helps yours.

@github-actions github-actions bot added the Stale label Dec 30, 2021
@rrjanbiah
Copy link

Bump! Holidays and the contributors might be away

@davyam
Copy link

davyam commented Jan 9, 2022

I'm working on some improvements and mock tests.

@davyam
Copy link

davyam commented Jan 24, 2022

Guys, as the code was refactored and the processor behavior was changed, we decided to start a new PR #5710. Then, we avoid misunderstandings with old messages.

There are several improvements in this new PR #5710 and we hope that all the contributors keep doing tests and giving us feedback from there.

ASAP this PR can be closed.

@joewitt joewitt closed this Jan 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet