-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13399 towards scala3 #11432
KAFKA-13399 towards scala3 #11432
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments to the changes to help with the review process.
@@ -22,7 +22,7 @@ import java.util.{Collections, Properties} | |||
import joptsimple._ | |||
import kafka.common.AdminCommandFailedException | |||
import kafka.log.LogConfig | |||
import kafka.utils._ | |||
import kafka.utils.{immutable=> _, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes like this one solve a collision between the immutable
package in Scala collections and kafka.utils.immutable
.
@@ -42,11 +42,11 @@ final class KafkaMetadataLog private ( | |||
// Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the | |||
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader. | |||
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], | |||
topicPartition: TopicPartition, | |||
topicPartitionArg: TopicPartition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoids a collision between the argument name and the method with the same name.
@@ -27,7 +27,7 @@ import com.typesafe.scalalogging.LazyLogging | |||
import joptsimple._ | |||
import kafka.utils.Implicits._ | |||
import kafka.utils.{Exit, _} | |||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer} | |||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig => ClientConsumerConfig, ConsumerRecord, KafkaConsumer} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This avoids a name clashing between the class ConsumerConfig
under o.a.k.c.c. with the one defined in the same class.
@@ -27,7 +27,7 @@ import kafka.message._ | |||
import kafka.utils.Implicits._ | |||
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils} | |||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback | |||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} | |||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig => ClientProducerConfig, ProducerRecord} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This avoids a name clashing between the class ProducerConfig
under o.a.k.c.p. with the one defined in the same class.
opts.options.valueOf(opts.interBrokerThrottleOpt).longValue(), | ||
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt).longValue(), | ||
opts.options.valueOf(opts.timeoutOpt).longValue()) | ||
} else if (opts.options.has(opts.cancelOpt)) { | ||
cancelAssignment(adminClient, | ||
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)), | ||
opts.options.has(opts.preserveThrottlesOpt), | ||
opts.options.valueOf(opts.timeoutOpt)) | ||
opts.options.valueOf(opts.timeoutOpt).longValue()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those numbers were all Int
s converted automatically to Long
.
@@ -353,7 +353,7 @@ class LogManager(logDirs: Seq[File], | |||
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e) | |||
} | |||
|
|||
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => | |||
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty[File]).filter(logDir => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala 3 wasn't able to infer that Array.empty
parametrized type should be File
.
@@ -85,13 +85,13 @@ object DecodeJson { | |||
else decodeJson.decodeEither(node).map(Some(_)) | |||
} | |||
|
|||
implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => { | |||
implicit def decodeSeq[E, S[E] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only way I could have it work. It seems this definition should have always been like this, but maybe I'm missing a use case.
The previous definition was never met in Scala 3.
if (node.isArray) | ||
decodeIterator(node.elements.asScala)(decodeJson.decodeEither) | ||
else Left(s"Expected JSON array, received $node") | ||
} | ||
|
||
implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = (node: JsonNode) => { | ||
implicit def decodeMap[V, M[K, V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = (node: JsonNode) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this just in case, but there is no test code using this.
@@ -239,10 +239,11 @@ class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness { | |||
|
|||
val exitLatch = new CountDownLatch(1) | |||
Exit.setExitProcedure((_, _) => exitLatch.countDown()) | |||
val feature1: SupportedVersionRange = brokerFeatures.supportedFeatures.get("feature_1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This intermediate variable is needed because Scala 3 typer was inferring it to be BaseVersionRange
which is package protected for org.apache.kafka.common.feature
.
@@ -59,7 +59,7 @@ class IsrExpirationTest { | |||
@BeforeEach | |||
def setUp(): Unit = { | |||
val logManager: LogManager = EasyMock.createMock(classOf[LogManager]) | |||
EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() | |||
EasyMock.expect(logManager.liveLogDirs).andReturn(Seq.empty[File]).anyTimes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method is expected to return a Seq
instead of Array
. Previous code wasn't working in Scala 3.
cc. @ijuma if you have some time to review this I would be thankful! These are the changes I mentioned in the mailing list in regards to the Scala 3 migration. These are only the ones that are for sure needed. Other changes that were present in #11350 but not here might not be needed once we migrate to Scala 3 as they are either already solved and will be in future versions of Scala, or being worked on. |
Test failure was https://issues.apache.org/jira/browse/KAFKA-4184 |
8775af8
to
0f16f24
Compare
Error was a flaky test: https://issues.apache.org/jira/browse/KAFKA-8785 |
As an aside: once this lands it would be great to get Kafka in the Scala 3 community build. (Among other reasons, because there is a lot of Java code in Kafka, and the Java-consumes-Scala scenario tends to be under-tested in both the 2 and 3 community builds.) I think VirtusLab hopes to support other build tools besides sbt there eventually, but if that hasn't happened, https://github.com/ennru/kafka/tree/build-with-sbt could probably be brought up to date — that's what we use to include Kafka in the Scala 2 community build. |
Hi @SethTisue thanks! I definitely will take a look at it! |
I'll resolve the conflicts tomorrow and ping people for reviews |
Scala 3 compiler is not so friendly with shadowing and reports errors where Scala 2 wasn't.
Scala 3 doesn't automatically convert `Short` to `Int` or `Int` to `Long` at will as in Scala 2. Changes in here help the typer by manually forcing the conversions.
Scala 2 was more lax about calling with empty parenthesis a method without parenthesis. This became stricter in Scala 3.
Scala 3 changed a bit in the area of type inference and in some cases it picks the most general type, causing in our case some trouble as this is not public but package protected. In other cases, the typer wasn't able to infer the proper one, forcing it to be manually set.
This is a reported bug that unfortunately can't be fixed easily without breakage on Scala's side. For further information check scala/scala3#13549
0f16f24
to
ddd25c6
Compare
At least one of the failures was: https://issues.apache.org/jira/browse/KAFKA-12319 |
@mumrah Could a maintainer help us with a review? |
Closing this. This PR is obsolete now as the core module is being ported to Java and on its own modules |
Hey @jlprat! How you doing? Do you think it is possible to build and release just As I understand it is a wrapper over java api. And core module migrations might not be needed to move this one to scala 3. |
Hi @antosha417 thanks for the interest. |
This PR takes all changes in #11350 that are needed for a Scala 3 compatibility except those with an open bug report or already solved one.
The PR is separated in 5 commits, one for each type of change performed:
Short
toInt
orInt
toLong
at will as in Scala 2. Changes in here help the typer by manually forcing the conversions.In other cases, the typer wasn't able to infer the proper one, forcing it to be manually set.
The resulting code is still Scala 2 valid code and arguably more correct.
Instead of doing all the changes in an enormous PR with all the changes at once, we can already perform the changes we know for a fact that are going to be needed for Scala 3.
If desired I can provide 1 PR per commit.
Committer Checklist (excluded from commit message)