-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9324: Drop support for Scala 2.11 (KIP-531) #7859
Conversation
Wow. Thanks for the amazing refactoring. Out of curiosity:
|
} | ||
}) | ||
newGauge("cleaner-recopy-percent", () => { | ||
val stats = cleaners.map(_.lastStats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will still create an intermediate collection, no? Why not use cleaners.iterator.map
, turn stats into an iterator... and avoid creating the extra collection completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good catch. I missed that in the initial change. Will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went to make this change and then I remembered why I didn't do it in the first place. The iterator can only be used once and we need two iterations. We would need to allocate two separator iterators. But cleaners
should be a small collection (size 1 by default) so the additional code complexity didn't seem worth it.
Thanks for taking a look. I used an IntelliJ inspection to find candidates. It didn't find most of the Gauge usages, so I relied on plain old search to find those. That's how I also came across the Java examples you asked about. You are correct that the Java changes could have been done independently of dropping support for Scala 2.11. |
@rhauch @ryannedolan Can you please confirm that we should stick with the old |
val alterResourceName = if (resourceName.nonEmpty) | ||
resourceName | ||
else | ||
ConfigEntityName.Default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was unused.
@@ -102,7 +102,7 @@ object AclCommand extends Logging { | |||
else | |||
new Properties() | |||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) | |||
val adminClient = JAdminClient.create(props) | |||
val adminClient = Admin.create(props) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
} else { | ||
invocation.callRealMethod().asInstanceOf[Try[File]] | ||
} | ||
doAnswer { invocation => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we use ()
here? Do we have any non-written rules on lambdas for when ()
or {}
should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean doAnswer(invocation => {...
? If so, I would say no
. The block syntax is more concise and generally recommended for cases like this.
@@ -78,8 +78,6 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { | |||
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce` | |||
*/ | |||
def reduce(reducer: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = | |||
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place | |||
// works perfectly with Scala 2.12 though |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove the asReducer
now? Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we cannot. The comment is wrong as far as I can tell.
*/ | ||
// TODO: Deprecate this class if support for Scala 2.11 + Java 1.8 is dropped. | ||
@deprecated(message = "Use org.apache.kafka.streams.kstream.Suppressed", since = "2.5") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove this class then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we cannot. The comment is wrong as far as I can tell.
retest this please. |
JDK 11 and Scala 2.12 passed, JDK 11 and Scala 2.13 had one flaky failure:
|
retest this please |
newGauge("SessionState", new Gauge[String] { | ||
override def value: String = Option(connectionState.toString).getOrElse("DISCONNECTED") | ||
}) | ||
newGauge("SessionState", () => connectionState.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Option
wrapping doesn't seem to achieve anything here. If connectionState
is null
, we'd get a NPE. toString
should always return a non null value.
Static methods in Java interfaces can be invoked since Scala 2.12.
5f1e01a
to
61ab584
Compare
Tests passed for both jobs that are still enabled: JDK 8 and Scala 2.12 and JDK 11 and Scala 2.13. @guozhangwang (for Streams) and @omkreddy (for Core) please review when you have a chance. |
@ijuma wrote:
Is MirrorMaker 2 designed to be installed into older Kafka installations? If so, then we should probably stick with the old AdminClient API for MirrorMaker 2. If not and MirrorMaker 2 should always be used with whatever Kafka installation that includes it, then it can use the newer API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma Thanks for the PR. Core related changes looks good to me.
LGTM, thanks @ijuma ! |
Thanks for the reviews, merged to trunk. |
This reverts commit 6dc6f6a.
- Revert "KAFKA-9324: Drop support for Scala 2.11 (KIP-531) (apache#7859)" - Fix some issue with scala 2.11 build (eg. scala compile option "-target:jvm-1.8") - Change all java lambda expression to anonymous class This reverts commit 6dc6f6a.
core
,streams-scala
andconnect-runtime
modules.runnable
andnewThread
fromCoreUtils
as lambdasyntax for SAM types make them unnecessary.
FunctionsCompatConversions
,KGroupedStream
,KGroupedTable' and
KStream` about Scala 2.11,the conversions are needed for Scala 2.12 too.
org.apache.kafka.streams.scala.kstream.Suppressed
and use
org.apache.kafka.streams.kstream.Suppressed
instead.Admin.create
instead ofAdminClient.create
. Static methodsin Java interfaces can be invoked since Scala 2.12. I noticed that
MirrorMaker 2 uses
AdminClient.create
, but I did not change themas Connectors have restrictions on newer client APIs.
Gauge
implementations by avoidingunnecessary intermediate collections.
Option.apply
inZookeeperClient
SessionState
metric.Committer Checklist (excluded from commit message)