Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable set server property && fmt scala code #60

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ spotless {
java {
googleJavaFormat()
}
scala {
target '**/*.scala'
scalafmt('3.8.0').configFile('../.scalafmt.conf')
licenseHeaderFile '../java.header', 'package'
}

groovyGradle {
target '*.gradle'
Expand Down
23 changes: 20 additions & 3 deletions app/src/main/scala/hstream/network/SocketServer.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.network

import org.apache.kafka.common.network.ListenerName
Expand All @@ -7,9 +23,10 @@ import kafka.server.KafkaConfig
class SocketServer(val config: KafkaConfig) {

def boundPort(listenerName: ListenerName): Int = {
val listener = config.effectiveAdvertisedListeners.find(_.listenerName == listenerName).getOrElse(
sys.error(s"Could not find listener with name ${listenerName.value}"))
listener.port
val listener = config.effectiveAdvertisedListeners
.find(_.listenerName == listenerName)
.getOrElse(sys.error(s"Could not find listener with name ${listenerName.value}"))
listener.port
}

}
24 changes: 21 additions & 3 deletions app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.common.utils.Time

import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import kafka.utils.Logging
import kafka.network.SocketServer
Expand Down Expand Up @@ -58,10 +75,11 @@ class KafkaBroker(
val storeDir = config.testingConfig
.getOrElse("store_dir", throw new IllegalArgumentException("store_dir is required"))
.asInstanceOf[String]
val extraCommandArgs =
(if (config.autoCreateTopicsEnable) "" else "--disable-auto-create-topic")
val extraProps = config.hstreamKafkaBrokerProperties
.map { case (k, v) => s"--prop $k=$v" }
.mkString(" ")
val dockerCmd =
s"docker run -d --network host --name $containerName -v $storeDir:/data/store $image $command $extraCommandArgs"
s"docker run -d --network host --name $containerName -v $storeDir:/data/store $image $command $extraProps"
info(s"=> Start hserver by: $dockerCmd")
val code = dockerCmd.!
if (code != 0) {
Expand Down
49 changes: 35 additions & 14 deletions app/src/main/scala/hstream/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util
import java.util.{Collections, Properties}
import java.util.Properties
import org.apache.kafka.common.config.{
AbstractConfig,
ConfigDef,
Expand All @@ -29,7 +31,7 @@ import org.apache.kafka.common.config.{
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.network.ListenerName

import scala.collection.{immutable, Map, Seq}
import scala.collection.{immutable, mutable, Map, Seq}
import scala.annotation.nowarn
import kafka.cluster.EndPoint
import kafka.utils.{CoreUtils, Logging}
Expand All @@ -55,6 +57,8 @@ object Defaults {
// Currently not supported in hstream
val NumPartitions = 1
val DefaultReplicationFactor = 1
// KAFKA_TO_HSTREAM: kafka default value is 3
val DefaultOffsetsTopicReplicationFactor: Short = 1

// TODO: KAFKA_ORIGINAL
// val Listeners = "PLAINTEXT://:9092"
Expand All @@ -76,6 +80,7 @@ object KafkaConfig {
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val NumPartitionsProp = "num.partitions"
val DefaultReplicationFactorProp = "default.replication.factor"
val OffsetsTopicReplicationFactorProp = "offsets.topic.replication.factor"

// TODO: KAFKA_ORIGINAL
// val ListenersProp = "listeners"
Expand Down Expand Up @@ -118,6 +123,14 @@ object KafkaConfig {
MEDIUM,
"$DefaultReplicationFactorDoc"
)
.define(
OffsetsTopicReplicationFactorProp,
SHORT,
Defaults.DefaultOffsetsTopicReplicationFactor,
atLeast(1),
HIGH,
"$OffsetsTopicReplicationFactorDoc"
)
.define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, "$SaslKerberosServiceNameDoc")

// TODO: KAFKA_ORIGINAL
Expand Down Expand Up @@ -194,9 +207,17 @@ class KafkaConfig private (
val advertisedAddress = getString(KafkaConfig.AdvertisedAddressProp)
val brokerId = getInt(KafkaConfig.BrokerIdProp)
val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp)

def hstreamKafkaBrokerProperties: Map[Any, Any] = {
val props = new mutable.HashMap[Any, Any]()
props.put(KafkaConfig.NumPartitionsProp, getInt(KafkaConfig.NumPartitionsProp))
props.put(KafkaConfig.DefaultReplicationFactorProp, getInt(KafkaConfig.DefaultReplicationFactorProp))
props.put(KafkaConfig.AutoCreateTopicsEnableProp, getBoolean(KafkaConfig.AutoCreateTopicsEnableProp))
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, getShort(KafkaConfig.OffsetsTopicReplicationFactorProp))
props
}

// Use advertised listeners if defined, fallback to listeners otherwise
def effectiveAdvertisedListeners: Seq[EndPoint] = {
val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
Expand Down
36 changes: 21 additions & 15 deletions app/src/main/scala/kafka/cluster/EndPoint.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.cluster

import org.apache.kafka.common.{KafkaException, Endpoint => JEndpoint}
import org.apache.kafka.common.{Endpoint => JEndpoint, KafkaException}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
Expand All @@ -36,17 +35,22 @@ object EndPoint {
* Create EndPoint object from `connectionString` and optional `securityProtocolMap`. If the latter is not provided,
* we fallback to the default behaviour where listener names are the same as security protocols.
*
* @param connectionString the format is listener_name://host:port or listener_name://[ipv6 host]:port
* for example: PLAINTEXT://myhost:9092, CLIENT://myhost:9092 or REPLICATION://[::1]:9092
* Host can be empty (PLAINTEXT://:9092) in which case we'll bind to default interface
* Negative ports are also accepted, since they are used in some unit tests
* @param connectionString
* the format is listener_name://host:port or listener_name://[ipv6 host]:port for example: PLAINTEXT://myhost:9092,
* CLIENT://myhost:9092 or REPLICATION://[::1]:9092 Host can be empty (PLAINTEXT://:9092) in which case we'll bind
* to default interface Negative ports are also accepted, since they are used in some unit tests
*/
def createEndPoint(connectionString: String, securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]): EndPoint = {
def createEndPoint(
connectionString: String,
securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]
): EndPoint = {
val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap)

def securityProtocol(listenerName: ListenerName): SecurityProtocol =
protocolMap.getOrElse(listenerName,
throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}"))
protocolMap.getOrElse(
listenerName,
throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}")
)

connectionString match {
case uriParseExp(listenerNameString, "", port) =>
Expand All @@ -67,10 +71,12 @@ object EndPoint {
}

def fromJava(endpoint: JEndpoint): EndPoint =
new EndPoint(endpoint.host(),
new EndPoint(
endpoint.host(),
endpoint.port(),
new ListenerName(endpoint.listenerName().get()),
endpoint.securityProtocol())
endpoint.securityProtocol()
)
}

/**
Expand All @@ -80,7 +86,7 @@ case class EndPoint(host: String, port: Int, listenerName: ListenerName, securit
def connectionString: String = {
val hostport =
if (host == null)
":"+port
":" + port
else
Utils.formatAddress(host, port)
listenerName.value + "://" + hostport
Expand Down
23 changes: 10 additions & 13 deletions app/src/main/scala/kafka/utils/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.utils

import java.util
Expand All @@ -24,16 +23,15 @@ import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/**
* In order to have these implicits in scope, add the following import:
*
* `import kafka.utils.Implicits._`
*/
* In order to have these implicits in scope, add the following import:
*
* `import kafka.utils.Implicits._`
*/
object Implicits {

/**
* The java.util.Properties.putAll override introduced in Java 9 is seen as an overload by the
* Scala compiler causing ambiguity errors in some cases. The `++=` methods introduced via
* implicits provide a concise alternative.
* The java.util.Properties.putAll override introduced in Java 9 is seen as an overload by the Scala compiler causing
* ambiguity errors in some cases. The `++=` methods introduced via implicits provide a concise alternative.
*
* See https://github.com/scala/bug/issues/10418 for more details.
*/
Expand All @@ -48,12 +46,11 @@ object Implicits {
}

/**
* Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and `foreach` in Scala 2.12
* (with the help of scala.collection.compat). `foreachEntry` avoids the tuple allocation and
* is more efficient.
* Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and `foreach` in Scala 2.12 (with the help of
* scala.collection.compat). `foreachEntry` avoids the tuple allocation and is more efficient.
*
* This was not named `foreachEntry` to avoid `unused import` warnings in Scala 2.13 (the implicit
* would not be triggered in Scala 2.13 since `Map.foreachEntry` would have precedence).
* This was not named `foreachEntry` to avoid `unused import` warnings in Scala 2.13 (the implicit would not be
* triggered in Scala 2.13 since `Map.foreachEntry` would have precedence).
*/
@nowarn("cat=unused-imports")
implicit class MapExtensionMethods[K, V](private val self: scala.collection.Map[K, V]) extends AnyVal {
Expand Down
29 changes: 15 additions & 14 deletions app/src/main/scala/kafka/utils/Json.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
Expand Down Expand Up @@ -36,8 +36,7 @@ object Json {
def parseFull(input: String): Option[JsonValue] = tryParseFull(input).toOption

/**
* Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of
* exception.
* Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of exception.
*/
def parseStringAs[T](input: String)(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = {
try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
Expand Down Expand Up @@ -65,9 +64,11 @@ object Json {

/**
* Parse a JSON string into a JsonValue if possible. It returns an `Either` where `Left` will be an exception and
* `Right` is the `JsonValue`.
* @param input a JSON string to parse
* @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value.
* `Right` is the `JsonValue`.
* @param input
* a JSON string to parse
* @return
* An `Either` which in case of `Left` means an exception and `Right` is the actual return value.
*/
def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
if (input == null || input.isEmpty)
Expand All @@ -77,16 +78,16 @@ object Json {
catch { case e: JsonProcessingException => Left(e) }

/**
* Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
* Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in the
* default configuration. That is, Java collections are supported, but Scala collections are not (to avoid a
* jackson-scala dependency).
*/
def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)

/**
* Encode an object into a JSON value in bytes. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid a
* jackson-scala dependency).
*/
def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj)
}