Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #29] [pulsar-spark] Adding SparkPulsarReliableReceiver #31

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

aditiwari01
Copy link

Fixes #29

Motivation

Current pulsar-spark adapter uses spark_streaming_2.10 while scala dependency is 2.11. Apart from this current receiver does not take care about reliability, rate limit and. backpressure. Added a new receiver with all these considerations.

Modifications

Includes:

  1. Updating spark_streaming_2.10 to spark_streaming_2.11.
  2. Includes rate limit/ backpressure logic in receiver.
  3. Batch read from pulsar instead of record by record.
  4. Making receiver reliable using batch store call.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work.

Unfortunately I don't know Scala so I cannot give much feedback.
I left a comment.

Hopefully other folks will be able to give more feedback.

We can ask om dev@pulsar.apache.org for more reviews

restart("Restart a consumer")
}
latestStorePushTime = System.currentTimeMillis()
new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give a name to this thread?

Also, can we keep a reference to the Thread as a field?
Usually this helps in shutting down the system as you can wait for the Thread to exit in order to not leave leaks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Ideally the thread should be stopped before stopping receiver (Controlled by isStopped). However, I'll add the check just in case.

@aditiwari01
Copy link
Author

Did anyone get a chance to review this?

}

override def onStop(): Unit = try {
if (consumerThread != null && consumerThread.isAlive) consumerThread.stop() // Ideally consumrThread should be closed beforee calling onStop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use Thread.stop() as it is deprecated and also it can lead to unpredictable behaviour (like unreleased locks)

It is enough to wait for it to finish here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be enough because onStop is called after setting isStopped = true. Hence thread should be closed by then.
But, generally speaking, what is the ideal way to close a thread that is stuck on something like IO? I didn't find anything other than stop that can kill the thread.

Can we do something like:
thread.join(30secs)
if(thread.isAlive()) thread.stop()?

@sijie
Copy link
Member

sijie commented Jan 13, 2022

@nlu90 can you take a look at this?

Copy link
Member

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you run some checkstyle tools to help format the code?

Comment on lines +155 to +156
if(autoAcknowledge)
consumer.acknowledge(groupedMessages.map(_.messageId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the autoACK is false, then the message can be kept in Pulsar infinitely.

Since the messages have been stored into Spark on line 154 (which means the data ownership is transferred to me), why not ack the message?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is true. This is a configurable property if the user wants the ownership of acknowledge (or negative ack for that matter).

@aditiwari01
Copy link
Author

@nlu90 Did you get a chance to go through this?

@@ -184,7 +184,7 @@
<caffeine.version>2.6.2</caffeine.version>
<java-semver.version>0.9.0</java-semver.version>
<hppc.version>0.7.3</hppc.version>
<spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version>
<spark-streaming_2.11.version>2.4.4</spark-streaming_2.11.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditiwari01 Is there a specific reason to use version 2.4.4?
It is quite old 2.4.8 is out and rather old too.

If we can upgrade to 3.2.1 (requires scala 2.12) we'll benefit from even newer release that includes updated dependencies with variety of CVEs patched, getting us from

Dependencies Scanned: 212 (145 unique)
Vulnerable Dependencies: 22
Vulnerabilities Found: 92

to

Dependencies Scanned: 265 (190 unique)
Vulnerable Dependencies: 12
Vulnerabilities Found: 45

with the rest being suppressable.

you'll need to use

<scala-library.version>2.12.15</scala-library.version>
<spark-streaming_2.12.version>3.2.1</spark-streaming_2.12.version>

and exclude log4j:

       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-streaming_2.10</artifactId>
-        <version>${spark-streaming_2.10.version}</version>
+        <artifactId>spark-streaming_2.12</artifactId>
+        <version>${spark-streaming_2.12.version}</version>
         <exclusions>
+          <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+          </exclusion>
           <exclusion>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we had spark 2.4.4 (2.11) setup at our end, hence I chose these version initially.
Anyway we're also upgrading internally to 3.2.1_2.12, I'll update spark-pulsar for the same as well.
(will only be able to do that post our internal upgradation)

Meanwhile, do we want to merge this version and cut out a separate branch? (So that we have a support for 2.11 scala and spark 2, in case anyone wants to use?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I think 2.4.4 is ok for now and we can move to 3.2.1 with pulsar 2.10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pulsar - Spark adapter for scala 2.11
5 participants