Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL sink connector doesn't persist message to table #22543

Closed
3 tasks done
alexandrebrilhante opened this issue Apr 19, 2024 · 2 comments
Closed
3 tasks done

PostgreSQL sink connector doesn't persist message to table #22543

alexandrebrilhante opened this issue Apr 19, 2024 · 2 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@alexandrebrilhante
Copy link

alexandrebrilhante commented Apr 19, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

OS: macOS Sonoma 14.4.1
Java: OpenJDK 17.0.11
Pulsar: 3.2.1

Minimal reproduce step

Example detailed here seems outdated. I've followed every step but still can't see new records in PostgreSQL. For comparison, there's seems to be no issue when switching to Cassandra with the same schema and producer setup. I've tried with both local and dockerized Postgres databases.

pulsar-postgres-jdbc-sink.yaml

configs:
  userName: "postgres"
  password: "postgres"
  jdbcUrl: "jdbc:postgresql://localhost:5432/postgres"
  tableName: "pulsar_postgres_jdbc_sink"

schema

{
  "type": "AVRO",
  "schema": "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
  "properties": {}
}

main.rs - 127.0.0.1:9999 sends dummy data e.g. {"id" 1, "name" "abcdefg"} which main then sends to Pulsar.

use pulsar::{producer::ProducerOptions, Pulsar, TokioExecutor};
use tokio::{io::AsyncReadExt, net::TcpListener, sync::mpsc};

#[tokio::main]
async fn main() {
    let addr: &str = "pulsar://localhost:6650";

    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
        .build()
        .await
        .expect("Failed to connect to Pulsar...");

    let topic_name: &str = "persistent://public/default/pulsar-postgres-jdbc-sink-topic";

    let mut producer: pulsar::Producer<TokioExecutor> = pulsar
        .producer()
        .with_topic(topic_name)
        .with_name("producer")
        .with_options(ProducerOptions {
            batch_size: Some(4),
            ..Default::default()
        })
        .build()
        .await
        .expect("Failed to create producer...");

    let (tx, mut rx) = mpsc::channel(100);

    let _producer_task: tokio::task::JoinHandle<()> = tokio::spawn(async move {
        while let Some(message) = rx.recv().await {
            match producer.send(message).await {
                Ok(_) => println!("Message sent to Pulsar..."),
                Err(e) => eprintln!("Failed to send message to Pulsar; err = {:?}...", e),
            }
        }
    });

    let listener: TcpListener = TcpListener::bind("127.0.0.1:9999")
        .await
        .expect("Failed to bind to address...");

    loop {
        let (mut socket, _addr) = listener
            .accept()
            .await
            .expect("Failed to accept connection...");

        let tx: mpsc::Sender<String> = tx.clone();

        tokio::spawn(async move {
            let mut buf: [u8; 1024] = [0; 1024];

            loop {
                let n: usize = match socket.read(&mut buf).await {
                    Ok(n) if n == 0 => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("Failed to read from socket; err = {:?}...", e);
                        return;
                    }
                };

                let message = String::from_utf8_lossy(&buf[0..n]).to_string();

                if tx.send(message).await.is_err() {
                    eprintln!("Failed to send message to channel...");
                    return;
                }
            }
        });
    }
}

Complete setup:

bin/pulsar standalone

bin/pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f $PWD/pulsar/connectors/schema

bin/pulsar-admin sinks create \
    --archive $PWD/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar \
    --inputs pulsar-postgres-jdbc-sink-topic \
    --name pulsar-postgres-jdbc-sink \
    --sink-config-file $PWD/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml \
    --parallelism 1

cargo build --release && cargo run --release

What did you expect to see?

PostgreSQL table pulsar_postgres_jdbc_sink being populated in real-time.

What did you see instead?

PostgreSQL table pulsar_postgres_jdbc_sink is empty although Pulsar is the producing the message properly.

Anything else?

No issues when inspecting the sink or the topic. Pulsar is able to produce the messages.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@alexandrebrilhante alexandrebrilhante added the type/bug The PR fixed a bug or issue reported a bug label Apr 19, 2024
@alexandrebrilhante
Copy link
Author

Seems to be related to the schema Postgres is using which is weird I'm using the same schema as the getting started example for Pulsar IO.

org.postgresql.util.PSQLException: ERROR: null value in column "id" of relation "pulsar_postgres_jdbc_sink" violates not-null constraint
  Detail: Failing row contains (null, null).
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676) ~[postgresql-42.5.1.jar:42.5.1]
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366) ~[postgresql-42.5.1.jar:42.5.1]
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356) ~[postgresql-42.5.1.jar:42.5.1]
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496) ~[postgresql-42.5.1.jar:42.5.1]
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413) ~[postgresql-42.5.1.jar:42.5.1]
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190) ~[postgresql-42.5.1.jar:42.5.1]
	at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:177) ~[postgresql-42.5.1.jar:42.5.1]
	at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:289) ~[pulsar-io-jdbc-core-3.2.2.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]

@alexandrebrilhante
Copy link
Author

Looks I was trying to persist {"id" 1, "name" "abcdefg"} as opposed to just {"name" "abcdefg"} as expected by the sink connector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

1 participant