Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDBC Sinks Stop Working as Intended after a Database Error #9464

Open
alexanderursu99 opened this issue Feb 3, 2021 · 6 comments
Open

JDBC Sinks Stop Working as Intended after a Database Error #9464

alexanderursu99 opened this issue Feb 3, 2021 · 6 comments
Labels
help wanted type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@alexanderursu99
Copy link

Describe the bug
When running a ClickHouse JDBC Sink, and encountering some error from the database (e.g. timeout), the sinks seems to continue consuming, but not actually insert or ack any further messages.

To Reproduce
Steps to reproduce the behavior:

  1. Run a ClickHouse JDBC sink with batch size 100000 and timeout 60000ms, using kubernetes runtime on 2.6.1
  2. Restart the ClickHouse instance to produce an error for the jdbc-driver client in the sink
  3. Inspect logs and behaviour of sink from metrics

Expected behavior
The sink should recover, and be able to continue inserting and acking messages.

Logs

23:10:12.763 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:12:12.766 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:13:10.794 [pool-5-thread-1] INFO ru.yandex.clickhouse.ClickHouseStatementImpl - Error during connection to ru.yandex.clickhouse.settings.ClickHouseProperties@3eb4c763, reporting failure to data source, message: Connect to 192.168.1.135:8123 [/192.168.1.135] failed: Connection refused (Connection refused)
23:13:10.796 [pool-5-thread-1] INFO ru.yandex.clickhouse.ClickHouseStatementImpl - Error sql: INSERT INTO skew_iv(currency,timestamp,spot_price,expiration_timestamp,ttm,ttm_fractional,rfr,alpha,beta,nu,rho,atm_iv,smile,skew,is_interpolated) VALUES('ETH',1612393920000,1641.27,1612425600000,31680000,0.0010038787,1.4379762,1.4130342,1.0,17.782494,0.21298867,1.4470383,-0.026410576,-0.0877126,0)
23:13:10.807 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 210, host: 192.168.1.135, port: 8123; Connect to 192.168.1.135:8123 [/192.168.1.135] failed: Connection refused (Connection refused)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:89) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:55) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:24) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:633) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHouseStatementImpl.execute(ClickHouseStatementImpl.java:226) ~[clickhouse-jdbc-0.2.4.jar:?]
at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.execute(ClickHousePreparedStatementImpl.java:105) ~[clickhouse-jdbc-0.2.4.jar:?]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.6.1.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 192.168.1.135:8123 [/192.168.1.135] failed: Connection refused (Connection refused)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:373) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[httpclient-4.5.5.jar:4.5.5]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614) ~[clickhouse-jdbc-0.2.4.jar:?]
... 14 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_252]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_252]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_252]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_252]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_252]
at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_252]
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:373) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.5.jar:4.5.5]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[httpclient-4.5.5.jar:4.5.5]
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614) ~[clickhouse-jdbc-0.2.4.jar:?]
... 14 more
23:13:12.768 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:14:12.772 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
23:15:12.776 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [clickhouse-sink] [7e6d8] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

In the logs you can see that the sink logs the regular update, shows the error from having the connection refused by ClickHouse (for now this happens when we have a restart), and the regular updates are being logged again, similar to how they were before.

Screenshots

image

In this screenshot you can see how there was a point where the backlog was accumulating. This was one instance of this error affecting the sink. Then the backlog comes back down after I manually restarted the sink from the CLI, which had the sink running properly again. And then later, another instance of this error occurred, and the backlog begins to accumulate again.

Additional context
Mentioned in the steps to reproduce:

  1. Using version 2.6.1
  2. Running function workers with kubernetes runtime
  3. Set sink batch size to to 100000
  4. Set sink timeout to 60000ms (1min)

Ideas

My working theory is that there's either something wrong logically with the JDBC sinks, where they somehow don't work properly after encountering some error from the database.

Or, there is something wrong more specifically with the ClickHouse JDBC driver being used, and it doesn't handle errors correctly.

I have not tested this with any other databases, but I imagine a quick test with either PostgreSQL or MySQL may reveal if this is a general issue with the JDBC sinks or not.

@alexanderursu99 alexanderursu99 added the type/bug The PR fixed a bug or issue reported a bug label Feb 3, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone Feb 4, 2021
@alexanderursu99 alexanderursu99 changed the title ClickHouse JDBC Sink Stops Working as Intended after a Database Error JDBC Sinks Stop Working as Intended after a Database Error Feb 9, 2021
@alexanderursu99
Copy link
Author

Updated the title, since I tested this using a PostgreSQL sink connector and get the same result, and I now believe this is a general issue with JDBC sinks.

@alexanderursu99
Copy link
Author

Logs from the PostgreSQL sink connector. Configured with all the same settings as were used with the ClickHouse sink connector, and using the same Kubernetes runtime.

20:46:59.030 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:45:59.029 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:44:59.027 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:43:59.025 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:42:59.024 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:41:59.023 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:40:59.022 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:39:59.020 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
... 12 more
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:313) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2044) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.PGStream.receiveChar(PGStream.java:372) ~[postgresql-42.2.12.jar:42.2.12]
Caused by: java.io.EOFException
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_252]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_252]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.6.1.jar:?]
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:148) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:159) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:369) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:448) ~[postgresql-42.2.12.jar:42.2.12]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:340) ~[postgresql-42.2.12.jar:42.2.12]
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
20:39:56.968 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
20:38:59.019 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:37:59.017 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:36:59.016 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:35:59.015 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:34:59.014 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:33:59.011 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:32:59.010 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:31:59.008 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 0.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
20:30:59.006 [pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [market_data/deribit/skew_iv_v2] [postgres-sink] [45408] Prefetched messages: 0 --- Consume throughput received: 2.50 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

@alexanderursu99
Copy link
Author

Now believe that this issue is related to using the EFFECTIVELY_ONCE mode for the sink. The issue doesn't seem to happen when using ATLEAST_ONCE.

@codelipenghui
Copy link
Contributor

Thanks @Alxander64 , Sorry for the late response, have you tried the new Pulsar version 2.7.0? or 2.6.3? If the problem still there, we need to fix it ASAP

@alexanderursu99
Copy link
Author

I have recently updated to 2.6.3, but since then I've only been running sinks on a more stable database. I first noticed this issue when sinking to ClickHouse, which I didn't have a great production setup for.

@alexanderursu99
Copy link
Author

For a simple test, I had my Pulsar cluster in k8s and brought up a singe Postgres replica with a Helm chart. I had a sink running configured like how I described above, and then I just deleted the pod running Postgres and waited for it to respawn. If new rows don't eventually populate in the table being sinked to, then the problem persists.

@codelipenghui codelipenghui modified the milestones: 2.8.0, 2.9.0 May 22, 2021
@eolivelli eolivelli modified the milestones: 2.9.0, 2.10.0 Oct 6, 2021
@codelipenghui codelipenghui modified the milestones: 2.10.0, 2.11.0 Feb 14, 2022
@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.12.0 Jul 26, 2022
@RobertIndie RobertIndie removed this from the 3.0.0 milestone Apr 11, 2023
@RobertIndie RobertIndie added this to the 3.1.0 milestone Apr 11, 2023
RobertIndie added a commit that referenced this issue Sep 14, 2023
…the fatal exception (#21143)

PIP: #21079 

### Motivation

Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown
outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler
cannot handle the fatal exceptions that are thrown outside the function instance thread.

For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If
any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will
not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue
here: #9464

The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from
an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has
been observed for the Kafka source connector: #9464. We have fixed it by adding
the notifyError method to the `PushSource` class in PIP-281: #20807. However, this
does not solve the same problem that all source connectors face because not all connectors are implemented based on
the `PushSource` class.

The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function
framework. We need to provide a way for the function developer to implement it.

We need a way for the connector and function developers to throw fatal exceptions outside the function instance
thread. The function framework should catch these exceptions and terminate the function accordingly.

### Modifications

 Introduce a new method `fatal` to the context. All the connector implementation code and the function code 
 can use this context and call the `fatal` method to terminate the instance while raising a fatal exception. 
  
 After the connector or function raises the fatal exception, the function instance thread will be interrupted. 
 The function framework then could catch the exception, log it, and then terminate the function instance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

7 participants