Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Kafka event logger for server events #4733

Closed
wants to merge 1 commit into from

Conversation

bowenliang123
Copy link
Contributor

@bowenliang123 bowenliang123 commented Apr 19, 2023

Why are the changes needed?

  • introduce new event logger type KAFKA
  • send server events to the Kafka topic with initializing and closing Kafka producer properly with server's lifecyle
  • use Kafka 3.4.0 as the client version, and tested with Kakfa servers of 2.8.x and 3.4.x

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before make a pull request

@bowenliang123
Copy link
Contributor Author

cc @lxbalex @jeanlyn

@codecov-commenter
Copy link

codecov-commenter commented Apr 19, 2023

Codecov Report

Merging #4733 (b5220d2) into master (6ae0c8b) will increase coverage by 0.06%.
The diff coverage is 93.22%.

@@             Coverage Diff              @@
##             master    #4733      +/-   ##
============================================
+ Coverage     57.98%   58.05%   +0.06%     
  Complexity       13       13              
============================================
  Files           581      583       +2     
  Lines         32491    32548      +57     
  Branches       4315     4317       +2     
============================================
+ Hits          18841    18896      +55     
+ Misses        11836    11834       -2     
- Partials       1814     1818       +4     
Impacted Files Coverage Δ
...rg/apache/kyuubi/events/EventHandlerRegister.scala 50.00% <33.33%> (-2.39%) ⬇️
...vents/handler/ServerKafkaLoggingEventHandler.scala 88.88% <88.88%> (ø)
...che/kyuubi/events/ServerEventHandlerRegister.scala 94.73% <90.00%> (-5.27%) ⬇️
...in/scala/org/apache/kyuubi/config/KyuubiConf.scala 97.20% <100.00%> (+0.02%) ⬆️
...ala/org/apache/kyuubi/events/EventLoggerType.scala 100.00% <100.00%> (ø)
...uubi/events/handler/KafkaLoggingEventHandler.scala 100.00% <100.00%> (ø)

... and 5 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@pan3793
Copy link
Member

pan3793 commented Apr 20, 2023

We can use Testcontianers to address the Kafka server compatibility issues

val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.backend.server.event.loggers")
.doc("A comma-separated list of server history loggers, where session/operation etc" +
" events go.<ul>" +
s" <li>JSON: the events will be written to the location of" +
s" ${SERVER_EVENT_JSON_LOG_PATH.key}</li>" +
s" <li>" +
s"KAFKA: the events sent to topic of `${SERVER_EVENT_KAFKA_TOPIC.key}`, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unify the format

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unclear what's content in the kafka record

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, change to mention the format with KAFKA: the events serialized in JSON format

@@ -181,6 +183,7 @@ simpleclient_tracer_otel/0.16.0//simpleclient_tracer_otel-0.16.0.jar
simpleclient_tracer_otel_agent/0.16.0//simpleclient_tracer_otel_agent-0.16.0.jar
slf4j-api/1.7.36//slf4j-api-1.7.36.jar
snakeyaml/1.33//snakeyaml-1.33.jar
snappy-java/1.1.7.3//snappy-java-1.1.7.3.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it support arm?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when introduce new artifacts in release tarball, don't forget to update LICENSE/NOTICE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snappy-java supports Windows, Mac OS X, Linux (x86, x86_64, arm, etc...). If your platform is not supported, you need to build native libraries by yourself.

Accorrding to snappy-java's doc here(https://github.com/xerial/snappy-java/blob/master/BUILD.md), arm architecture should be supported.


override def apply(event: KyuubiEvent): Unit = {
try {
val record = new ProducerRecord[String, String](topic, event.eventType, event.toJson)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that Kafka allows setting event timestamp on constructing ProducerRecord, it may be useful in some cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Setting timestamp with System.currentTimeMillis. Is that good enough? Or use Java8's Instant.now.toEpochMilli (with UTC clock)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While timestamp is not set , the producer will assign the timestamp using System.currentTimeMillis().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, it should be event time, but I just notice that KyuubiEvent does not have event time ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, no available unified event time in KyuubiEvent right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave a TODO here, we should add an event_time in KyuubiEvent and propagate it to Kafka, but it may break the backward compatibility, so better to do it later separately

@bowenliang123 bowenliang123 marked this pull request as draft April 20, 2023 15:02
@github-actions github-actions bot added the kind:infra license, community building, project builds, asf infra related, etc. label Apr 21, 2023
@github-actions github-actions bot added kind:infra license, community building, project builds, asf infra related, etc. and removed kind:infra license, community building, project builds, asf infra related, etc. labels Apr 21, 2023
@bowenliang123 bowenliang123 marked this pull request as ready for review April 22, 2023 05:11
pom.xml Outdated Show resolved Hide resolved
@bowenliang123 bowenliang123 changed the title Introduce Kafka event logger for server events [WIP] Introduce Kafka event logger for server events Apr 24, 2023
@bowenliang123
Copy link
Contributor Author

bowenliang123 commented Apr 25, 2023

cc @iodone @wForget

I am looking for proper timing to close event handlers for Kyuubi Server. But repeated errors comes in the irreverent places.

As I tried to close event handlers via EventBus.close() in KyuubiSever's stop() or stopServer,
the get operation log test case failed, which is completely unrelated to Kafka logger introduced in this PR, and it also never uses the Kafka logger in context.

If you guys have any idea for this, feel free to ping me any time. Thanks.

ERROR org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler: Failed to send event in KafkaEventHandler
java.lang.IllegalStateException: Cannot perform operation after producer has been closed
	at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:892) ~[kafka-clients-2.8.1.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:901) ~[kafka-clients-2.8.1.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) ~[kafka-clients-2.8.1.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:773) ~[kafka-clients-2.8.1.jar:?]
	at org.apache.kyuubi.events.handler.KafkaLoggingEventHandler.apply(KafkaLoggingEventHandler.scala:50) ~[kyuubi-events_2.12-1.8.0-SNAPSHOT.jar:?]
	at org.apache.kyuubi.events.handler.KafkaLoggingEventHandler.apply(KafkaLoggingEventHandler.scala:31) ~[kyuubi-events_2.12-1.8.0-SNAPSHOT.jar:?]
	at org.apache.kyuubi.events.EventBus$EventBusLive.$anonfun$post$4(EventBus.scala:87) ~[kyuubi-events_2.12-1.8.0-SNAPSHOT.jar:?]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.17.jar:?]
- get operation log *** FAILED ***
  scala.collection.JavaConverters.asScalaBufferConverter[String](logRowSet.getLogRowSet()).asScala.exists(((x$1: String) => x$1.contains("show tables"))) was false (OperationsResourceSuite.scala:107)

see : https://github.com/apache/kyuubi/actions/runs/4795357563/jobs/8529805995?pr=4733#step:8:1778

@bowenliang123 bowenliang123 changed the title [WIP] Introduce Kafka event logger for server events Introduce Kafka event logger for server events May 4, 2023

override def close(): Unit = {
kafkaProducer.flush()
kafkaProducer.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to use close method with timeout.

org.apache.kafka.clients.producer.Producer#close(long, java.util.concurrent.TimeUnit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any suggested conf for the timeout here? As for loggers, for servers, for server event loggers?

Or let's introduce proper config next time ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to introduce a new configuration for it. For server's Kafka logger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will do it in two steps.

  1. introduce a new config kyuubi.backend.server.event.kafka.close.timeout, with default value 60 secs.
  2. pass the config in ServerEventHandlerRegister#createKafkaEventHandler to construct KafkaLoggingEventHandler instance.

Copy link
Member

@pan3793 pan3793 May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60s is too long, it will blocks other shutdown procedures, likely 3~5s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's make it in 5s by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, it's removed for flushing the producer, for the reasons, the closing producer wakes up the internal sender to send pending requests (exact way in flushing), and the flushing method does not accept timeout.

@@ -22,5 +22,5 @@ object EventLoggerType extends Enumeration {
type EventLoggerType = Value

// TODO: Only SPARK is done now
val SPARK, JSON, JDBC, CUSTOM = Value
val SPARK, JSON, JDBC, KAFKA, CUSTOM = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add enumeration values instead of modifying the order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA value is added before CUSTOM here, in order to

  1. follow the same order as in docs
  2. CUSTOM is an extra option, which could be listed behind the options with embedded support

I'm open to your suggestion. WDYT @pan3793 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the new one at the tail, to suppress the potential serializable issue. And the TODO comment is outdated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Let me move it to the tail, as you mentioned serializable problem.

@bowenliang123 bowenliang123 self-assigned this May 8, 2023
@bowenliang123 bowenliang123 added this to the v1.8.0 milestone May 8, 2023

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are using Testcontainers for integration testing, why should we depend on the Kafka server?

And after second thought, I prefer to use MockProducer rather than launch a real Kafka broker, but the latter is also fine for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockProducer is lightweight to simulate producer-consumer situation, but we need a broker bootstrap server from producer-borker-consumer here. And I don't know how to get a bootstrap server from a MockProducer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the purpose of the consumer is to verify the events have been successfully delivered to Kafka, we can simply check the buffer of MockProducer instead

pom.xml Outdated
Comment on lines 1275 to 1280
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary exclusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we should skip this exclusion? kafka-clients is depending on slf4j-api.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should respect the Maven transitive dependencies in most cases, just excluding the deps we truly don't need, and overriding versions in dependencyManagement(just overriding, without excluding)

But it's not suitable for components which is disastrous for dependencies management, like Hive, Hudi.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Removed this exclusion.

@@ -1236,7 +1236,7 @@ This product optionally depends on 'zstd-jni', a zstd-jni Java compression
and decompression library, which can be obtained at:

* LICENSE:
* license/LICENSE.zstd-jni.txt (Apache License 2.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this license issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍻 No problem. And we could make this change to a separate PR if you want.

@@ -21,6 +21,5 @@ object EventLoggerType extends Enumeration {

type EventLoggerType = Value

// TODO: Only SPARK is done now
val SPARK, JSON, JDBC, CUSTOM = Value
val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all others is supported now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems JDBC is not supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this TODO can be removed. Not all the event logger types in both servers or the engine. It's a legacy todo hints.

Comment on lines 2031 to 2035
s" <li>KAFKA: the events serialized in JSON format" +
s" and sent to topic of `${SERVER_EVENT_KAFKA_TOPIC.key}`. " +
s" Note: For the configs of Kafka producer," +
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s" <li>KAFKA: the events serialized in JSON format" +
s" and sent to topic of `${SERVER_EVENT_KAFKA_TOPIC.key}`. " +
s" Note: For the configs of Kafka producer," +
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092" +
s" <li>KAFKA: the events will be serialized in JSON format" +
s" and sent to topic of `${SERVER_EVENT_KAFKA_TOPIC.key}`." +
s" Note: For the configs of Kafka producer," +
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +

Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM, only minor things of docs

Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM, only minor things of docs

@bowenliang123
Copy link
Contributor Author

Thanks, LGTM, only minor things of docs

Thanks for the detailed reviews. Wait for the CI builds and it will be merged. @pan3793 @cxzl25

Copy link
Contributor

@cxzl25 cxzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bowenliang123 bowenliang123 deleted the kafka-logger branch May 8, 2023 14:46
@bowenliang123
Copy link
Contributor Author

Thanks, merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:build kind:documentation Documentation is a feature! kind:infra license, community building, project builds, asf infra related, etc. module:common module:events module:server
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants