Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docker container environment config to create it dynamically. #1914

Merged
merged 2 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ package izumi.distage.docker.bundled

import distage.{ModuleDef, TagK}
import izumi.distage.docker.ContainerDef
import izumi.distage.docker.model.Docker.DockerPort
import izumi.distage.docker.model.Docker.{ContainerEnvironment, DockerPort}

object KafkaDocker extends ContainerDef {
val primaryPort: DockerPort = DockerPort.DynamicTCP("dynamic_kafka_port")

private[this] def portVars: String = Seq(
s"""KAFKA_ADVERTISED_PORT=$$${primaryPort.toEnvVariable}""",
s"""KAFKA_PORT=$$${primaryPort.toEnvVariable}""",
).map(defn => s"export $defn").mkString("; ")

override def config: Config = {
Config(
image = "wurstmeister/kafka:2.12-2.4.1",
ports = Seq(primaryPort),
env = Map("KAFKA_ADVERTISED_HOST_NAME" -> "127.0.0.1"),
entrypoint = Seq("sh", "-c", s"$portVars ; start-kafka.sh"),
env = ContainerEnvironment.from {
ports =>
val port = ports.getOrElse(primaryPort, "0000")
Map(
"KAFKA_ADVERTISED_HOST_NAME" -> "127.0.0.1",
"KAFKA_ADVERTISED_PORT" -> port,
"KAFKA_PORT" -> port,
)
},
)
}
}
Expand All @@ -26,18 +28,52 @@ object KafkaTwofaceDocker extends ContainerDef {
val insidePort: DockerPort = DockerPort.DynamicTCP("dynamic_kafka_port_inside")
val outsidePort: DockerPort = DockerPort.DynamicTCP("dynamic_kafka_port_outside")

private[this] def portVars: String = Seq(
s"""KAFKA_LISTENERS=INSIDE://:$$${insidePort.toEnvVariable},OUTSIDE://:$$${outsidePort.toEnvVariable}""",
s"""KAFKA_ADVERTISED_LISTENERS=INSIDE://:$$${insidePort.toEnvVariable},OUTSIDE://127.0.0.1:$$${outsidePort.toEnvVariable}""",
"""KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE""",
"""KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT""",
).map(defn => s"export $defn").mkString("; ")

override def config: Config = {
Config(
image = "wurstmeister/kafka:2.12-2.4.1",
ports = Seq(insidePort, outsidePort),
entrypoint = Seq("sh", "-c", s"$portVars ; start-kafka.sh"),
env = ContainerEnvironment.from {
ports =>
val insidePortBinding = ports.getOrElse(insidePort, "0000")
val outsidePortBinding = ports.getOrElse(outsidePort, "0000")
Map(
"KAFKA_INTER_BROKER_LISTENER_NAME" -> "INSIDE",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" -> "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT",
"KAFKA_LISTENERS" -> s"INSIDE://:$insidePortBinding,OUTSIDE://:$outsidePortBinding",
"KAFKA_ADVERTISED_LISTENERS" -> s"INSIDE://:$insidePortBinding,OUTSIDE://127.0.0.1:$outsidePortBinding",
)
},
userTags = Map("kafka_container" -> "twofaced"),
)
}
}

object KafkaKRaftDocker extends ContainerDef {
val primaryPort: DockerPort = DockerPort.DynamicTCP("dynamic_kafka_port")

override def config: Config = {
Config(
image = "bitnami/kafka:3.4.0",
ports = Seq(primaryPort),
env = ContainerEnvironment.from {
ports =>
val port = ports.getOrElse(primaryPort, "0000")
Map(
"KAFKA_CFG_PORT" -> port,
"KAFKA_CFG_ADVERTISED_PORT" -> port,
"KAFKA_CFG_LISTENERS" -> s"PLAINTEXT://:$port,CONTROLLER://:9093",
"KAFKA_CFG_ADVERTISED_LISTENERS" -> s"PLAINTEXT://127.0.0.1:$port",
"ALLOW_PLAINTEXT_LISTENER" -> "yes",
"KAFKA_ENABLE_KRAFT" -> "yes",
"KAFKA_BROKER_ID" -> "1",
"KAFKA_CFG_ADVERTISED_HOST_NAME" -> "127.0.0.1",
"KAFKA_CFG_PROCESS_ROLES" -> "broker,controller",
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES" -> "CONTROLLER",
"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" -> "1@127.0.0.1:9093",
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP" -> "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_CFG_DELETE_TOPIC_ENABLE" -> "true",
)
},
)
}
}
Expand All @@ -56,6 +92,10 @@ class KafkaDockerModule[F[_]: TagK] extends ModuleDef {
.connectToNetwork(KafkaZookeeperNetwork)
.dependOnContainerPorts(ZookeeperDocker)(2181 -> "KAFKA_ZOOKEEPER_CONNECT")
}

make[KafkaKRaftDocker.Container].fromResource {
KafkaKRaftDocker.make[F]
}
}

object KafkaDockerModule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import izumi.distage.docker.model.Docker.DockerPort
object ZookeeperDocker extends ContainerDef {
override def config: Config = {
Config(
image = "zookeeper:3.4.14",
image = "zookeeper:3.5",
ports = Seq(DockerPort.TCP(2181)),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ open class ContainerResource[F[_], Tag](
val portsEnv = ports.map {
port => port.port.toEnvVariable -> port.binding.getBinding.getHostPortSpec
}
val adjustedEnv = portsEnv ++ config.env
val containerEnv = config.env.env(ports.map(p => p.port -> p.binding.getBinding.getHostPortSpec).toMap)
val adjustedEnv = portsEnv ++ containerEnv

for {
_ <- F.when(config.autoPull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.net.{Inet4Address, Inet6Address, InetAddress}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.util.{Success, Try}
import scala.language.implicitConversions

object Docker {
final case class AvailablePort(host: ServiceHost, port: Int) extends HostPortPair {
Expand Down Expand Up @@ -164,7 +165,7 @@ object Docker {
registry: Option[String] = None,
ports: Seq[DockerPort],
name: Option[String] = None,
env: Map[String, String] = Map.empty,
env: ContainerEnvironment = ContainerEnvironment.empty,
userTags: Map[String, String] = Map.empty,
cmd: Seq[String] = Seq.empty,
entrypoint: Seq[String] = Seq.empty,
Expand All @@ -185,6 +186,23 @@ object Docker {
def udpPorts: Set[DockerPort] = ports.collect { case t: DockerPort.UDPBase => t: DockerPort }.toSet
}

trait ContainerEnvironment { self =>
def env(containerPortMapping: Map[DockerPort, String]): Map[String, String]
final def ++(other: ContainerEnvironment): ContainerEnvironment = new ContainerEnvironment {
override def env(containerPortMapping: Map[DockerPort, String]): Map[String, String] = {
self.env(containerPortMapping) ++ other.env(containerPortMapping)
}
}
}
object ContainerEnvironment {
def from(create: Map[DockerPort, String] => Map[String, String]): ContainerEnvironment = new ContainerEnvironment {
override def env(containerPortMapping: Map[DockerPort, String]): Map[String, String] = create(containerPortMapping)
}
def empty: ContainerEnvironment = from(_ => Map.empty)

implicit def fromIterable[I[v] <: Iterable[v]](map: I[(String, String)]): ContainerEnvironment = from(_ => map.toMap)
}

/**
* Client configuration that will be read from HOCON config.
* See `docker-reference.conf` for an example configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ abstract class DistageTestDockerBIO extends Spec2[IO] {
"support docker resources" in {
// TODO: additionally check flyway outcome with doobie
(service: PgSvcExample, verifier: Lifecycle[IO[Throwable, _], ReuseCheckContainer.Container], log: LogIO2[IO]) =>
println(s"ports/1: pg=${service.pg} pgfw=${service.pgfw} ddb=${service.ddb} kafka=${service.kafka}/${service.kafkaKraft}/${service.kafkaTwoFace} cs=${service.cs}")
for {
_ <- log.info(s"ports/1: pg=${service.pg} pgfw=${service.pgfw} ddb=${service.ddb} kafka=${service.kafka} cs=${service.cs}")
_ <- log.info(s"ports/1: pg=${service.pg} pgfw=${service.pgfw} ddb=${service.ddb} kafka=${service.kafka}/${service.kafkaKraft}/${service.kafkaTwoFace} cs=${service.cs}")
// a new alpine container is spawned every time here
_ <- verifier.use(_ => IO.unit)
} yield ()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package izumi.distage.testkit.docker.fixtures

import distage.config.ConfigModuleDef
import izumi.distage.docker.model.Docker.AvailablePort
import izumi.distage.docker.bundled.*
import izumi.distage.docker.model.Docker.AvailablePort
import izumi.distage.docker.modules.DockerSupportModule
import izumi.distage.model.definition.Id
import izumi.distage.model.definition.StandardAxis.Mode
Expand All @@ -17,6 +17,8 @@ class PgSvcExample(
val pg: AvailablePort @Id("pg"),
val ddb: AvailablePort @Id("ddb"),
val kafka: AvailablePort @Id("kafka"),
val kafkaKraft: AvailablePort @Id("kafka-kraft"),
val kafkaTwoFace: AvailablePort @Id("kafka-twoface"),
val cs: AvailablePort @Id("cs"),
val mq: AvailablePort @Id("mq"),
val pgfw: AvailablePort @Id("pgfw"),
Expand Down Expand Up @@ -58,6 +60,16 @@ object DockerPlugin extends PluginDef {
kafka.availablePorts.first(KafkaDocker.primaryPort)
}

make[AvailablePort].named("kafka-kraft").tagged(Mode.Test).from {
(kafka: KafkaKRaftDocker.Container) =>
kafka.availablePorts.first(KafkaKRaftDocker.primaryPort)
}

make[AvailablePort].named("kafka-twoface").tagged(Mode.Test).from {
(kafka: KafkaTwofaceDocker.Container @Id("twoface")) =>
kafka.availablePorts.first(KafkaTwofaceDocker.outsidePort)
}

make[AvailablePort].named("pgfw").tagged(Mode.Test).from {
(cs: PostgresFlyWayDocker.Container) =>
cs.availablePorts.first(PostgresFlyWayDocker.primaryPort)
Expand Down