Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ lazy val root = project
jvm,
jvmMicrometer,
jvmPureConfig,
lettuce,
lettucePureConfig,
micrometer,
micrometerJmx,
micrometerJmxPureConfig,
Expand Down Expand Up @@ -323,6 +325,23 @@ lazy val jvmPureConfig = project
libraryDependencies += Dependencies.pureConfig
)

lazy val lettuce = project
.in(file("lettuce"))
.settings(BuildSettings.common)
.settings(
name := "sst-lettuce",
libraryDependencies += Dependencies.lettuce
)

lazy val lettucePureConfig = project
.in(file("lettuce-pureconfig"))
.dependsOn(lettuce)
.settings(BuildSettings.common)
.settings(
name := "sst-lettuce-pureconfig",
libraryDependencies += Dependencies.pureConfig
)

lazy val micrometer = project
.in(file("micrometer"))
.settings(BuildSettings.common)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.avast.sst.lettuce.pureconfig

import java.nio.charset.Charset

import cats.syntax.either._
import com.avast.sst.lettuce.LettuceConfig
import com.avast.sst.lettuce.LettuceConfig.{SocketOptions, SslOptions, TimeoutOptions}
import io.lettuce.core.ClientOptions.DisconnectedBehavior
import io.lettuce.core.protocol.ProtocolVersion
import pureconfig.ConfigReader
import pureconfig.error.CannotConvert
import pureconfig.generic.ProductHint
import pureconfig.generic.semiauto.deriveReader

trait ConfigReaders {

implicit protected def hint[T]: ProductHint[T] = ProductHint.default

implicit val lettuceDisconnectedBehaviorConfigReader: ConfigReader[DisconnectedBehavior] = ConfigReader.stringConfigReader.emap {
case "DEFAULT" => DisconnectedBehavior.DEFAULT.asRight
case "ACCEPT_COMMANDS" => DisconnectedBehavior.ACCEPT_COMMANDS.asRight
case "REJECT_COMMANDS" => DisconnectedBehavior.REJECT_COMMANDS.asRight
case unknown =>
CannotConvert(
unknown,
"DisconnectedBehavior",
s"Unknown enum value: ${DisconnectedBehavior.values().map(_.name()).mkString("|")}"
).asLeft
}

implicit val lettuceProtocolVersionConfigReader: ConfigReader[ProtocolVersion] = ConfigReader.stringConfigReader.emap {
case "RESP2" => ProtocolVersion.RESP2.asRight
case "RESP3" => ProtocolVersion.RESP3.asRight
case unknown =>
CannotConvert(
unknown,
"ProtocolVersion",
s"Unknown enum value: ${ProtocolVersion.values().map(_.name()).mkString("|")}"
).asLeft
}

implicit val lettuceCharsetConfigReader: ConfigReader[Charset] = ConfigReader.stringConfigReader.emap { charset =>
Either.catchNonFatal(Charset.forName(charset)).leftMap(ex => CannotConvert(charset, "java.nio.Charset", ex.getMessage))
}

implicit val lettuceSocketOptionsReader: ConfigReader[SocketOptions] = deriveReader

implicit val lettuceSslOptionsReader: ConfigReader[SslOptions] = deriveReader

implicit val lettuceTimeoutOptionsReader: ConfigReader[TimeoutOptions] = deriveReader

implicit val lettuceConfigReader: ConfigReader[LettuceConfig] = deriveReader

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.avast.sst.lettuce.pureconfig

import pureconfig.ConfigFieldMapping
import pureconfig.generic.ProductHint

/** Contains [[pureconfig.ConfigReader]] instances with default "kebab-case" naming convention. */
object implicits extends ConfigReaders {

/** Contains [[pureconfig.ConfigReader]] instances with "kebab-case" naming convention.
*
* This is alias for the default `implicits._` import.
*/
object KebabCase extends ConfigReaders

/** Contains [[pureconfig.ConfigReader]] instances with "camelCase" naming convention. */
object CamelCase extends ConfigReaders {
implicit override protected def hint[T]: ProductHint[T] = ProductHint(ConfigFieldMapping(pureconfig.CamelCase, pureconfig.CamelCase))
}

}
46 changes: 46 additions & 0 deletions lettuce/src/main/scala/com/avast/sst/lettuce/LettuceConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.avast.sst.lettuce

import java.nio.charset.Charset

import com.avast.sst.lettuce.LettuceConfig.{SocketOptions, SslOptions, TimeoutOptions}
import io.lettuce.core.ClientOptions.DisconnectedBehavior
import io.lettuce.core.protocol.ProtocolVersion
import io.lettuce.core.{ClientOptions, SocketOptions => LettuceSocketOptions, TimeoutOptions => LettuceTimeoutOptions}

import scala.concurrent.duration.Duration

final case class LettuceConfig(
uri: String,
pingBeforeActivateConnection: Boolean = ClientOptions.DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION,
autoReconnect: Boolean = ClientOptions.DEFAULT_AUTO_RECONNECT,
cancelCommandsOnReconnectFailure: Boolean = ClientOptions.DEFAULT_CANCEL_CMD_RECONNECT_FAIL,
suspendReconnectOnProtocolFailure: Boolean = ClientOptions.DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL,
requestQueueSize: Int = ClientOptions.DEFAULT_REQUEST_QUEUE_SIZE,
disconnectedBehavior: DisconnectedBehavior = DisconnectedBehavior.DEFAULT,
protocolVersion: Option[ProtocolVersion] = None,
scriptCharset: Charset = ClientOptions.DEFAULT_SCRIPT_CHARSET,
publishOnScheduler: Boolean = ClientOptions.DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL,
socketOptions: SocketOptions = SocketOptions(),
sslOptions: SslOptions = SslOptions(),
timeoutOptions: TimeoutOptions = TimeoutOptions()
)

object LettuceConfig {

final case class SocketOptions(
connectTimeout: Duration = Duration.fromNanos(LettuceSocketOptions.DEFAULT_CONNECT_TIMEOUT_DURATION.toNanos),
keepAlive: Boolean = LettuceSocketOptions.DEFAULT_SO_KEEPALIVE,
tcpNoDelay: Boolean = LettuceSocketOptions.DEFAULT_SO_NO_DELAY
)

final case class SslOptions(
keyStoreType: Option[String] = None,
keyStorePath: Option[String] = None,
keyStorePassword: Option[String] = None,
trustStorePath: Option[String] = None,
trustStorePassword: Option[String] = None
)

final case class TimeoutOptions(timeoutCommands: Boolean = LettuceTimeoutOptions.DEFAULT_TIMEOUT_COMMANDS)

}
95 changes: 95 additions & 0 deletions lettuce/src/main/scala/com/avast/sst/lettuce/LettuceModule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.avast.sst.lettuce

import java.io.File
import java.time.Duration

import cats.effect.{Async, Resource, Sync}
import cats.syntax.either._
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.codec.RedisCodec
import io.lettuce.core.resource.ClientResources
import io.lettuce.core.{ClientOptions, RedisClient, RedisURI, SocketOptions, SslOptions, TimeoutOptions}

object LettuceModule {

/** Makes [[io.lettuce.core.RedisClient]] initialized with the given config and optionally [[io.lettuce.core.resource.ClientResources]]. */
def makeClient[F[_]: Sync](config: LettuceConfig, clientResources: Option[ClientResources] = None): Resource[F, RedisClient] = {
val create = clientResources match {
case Some(resources) => RedisClient.create(resources)
case None => RedisClient.create()
}
val sync = Sync[F]
Resource.make {
sync.delay {
val client = create
client.setOptions(makeClientOptions(config))
client
}
}(c => sync.delay(c.shutdown()))
}

/** Makes [[io.lettuce.core.api.StatefulRedisConnection]] initialized with the given config and optionally [[io.lettuce.core.resource.ClientResources]]. */
def makeConnection[F[_]: Async, K, V](
config: LettuceConfig,
clientResources: Option[ClientResources] = None
)(implicit codec: RedisCodec[K, V]): Resource[F, StatefulRedisConnection[K, V]] = {
makeClient[F](config, clientResources).flatMap { client =>
val async = Async[F]
Resource.make[F, StatefulRedisConnection[K, V]] {
async.asyncF[StatefulRedisConnection[K, V]] { cb =>
async.delay {
client
.connectAsync(codec, RedisURI.create(config.uri))
.handle[Unit] { (connection, ex) =>
if (ex == null) {
cb(connection.asRight)
} else {
cb(ex.asLeft)
}
}
()
}
}
}(c => async.delay(c.close()))
}
}

private def makeClientOptions(config: LettuceConfig): ClientOptions =
ClientOptions
.builder()
.pingBeforeActivateConnection(config.pingBeforeActivateConnection)
.autoReconnect(config.autoReconnect)
.cancelCommandsOnReconnectFailure(config.cancelCommandsOnReconnectFailure)
.suspendReconnectOnProtocolFailure(config.suspendReconnectOnProtocolFailure)
.requestQueueSize(config.requestQueueSize)
.disconnectedBehavior(config.disconnectedBehavior)
.protocolVersion(config.protocolVersion.orNull)
.scriptCharset(config.scriptCharset)
.publishOnScheduler(config.publishOnScheduler)
.socketOptions(
SocketOptions
.builder()
.connectTimeout(Duration.ofNanos(config.socketOptions.connectTimeout.toNanos))
.keepAlive(config.socketOptions.keepAlive)
.tcpNoDelay(config.socketOptions.tcpNoDelay)
.build()
)
.timeoutOptions(TimeoutOptions.builder().timeoutCommands(config.timeoutOptions.timeoutCommands).build())
.sslOptions {
val opts = SslOptions
.builder()
.jdkSslProvider()

config.sslOptions.keyStoreType.foreach(opts.keyStoreType)
config.sslOptions.keyStorePath.zip(config.sslOptions.keyStorePassword).foreach { case (path, pass) =>
opts.keystore(new File(path), pass.toCharArray)
}
config.sslOptions.trustStorePath.zip(config.sslOptions.trustStorePassword).foreach { case (path, pass) =>
opts.truststore(new File(path), pass)
}

opts.build()
}
.build()

}
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object Dependencies {
val http4sServer = "org.http4s" %% "http4s-server" % Versions.http4s
val jsr305 = "com.google.code.findbugs" % "jsr305" % "3.0.2"
val kindProjector = "org.typelevel" % "kind-projector" % "0.11.0" cross CrossVersion.full
val lettuce = "io.lettuce" % "lettuce-core" % "6.0.1.RELEASE"
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3"
val micrometerCore = "io.micrometer" % "micrometer-core" % Versions.micrometerCore
val micrometerJmx = "io.micrometer" % "micrometer-registry-jmx" % Versions.micrometerJmx
Expand Down
27 changes: 27 additions & 0 deletions site/docs/subprojects/lettuce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
layout: docs
title: "Lettuce (Redis)"
---

# FS2 Kafka

`libraryDependencies += "com.avast" %% "sst-lettuce" % "@VERSION@"`

This subproject initializes [Lettuce](https://lettuce.io) Redis driver:

```scala mdoc:silent
import cats.effect.Resource
import com.avast.sst.lettuce.{LettuceConfig, LettuceModule}
import io.lettuce.core.codec.{RedisCodec, StringCodec}
import zio._
import zio.interop.catz._

implicit val runtime = zio.Runtime.default // this is just needed in example

implicit val lettuceCodec: RedisCodec[String, String] = StringCodec.UTF8

for {
connection <- LettuceModule.makeConnection[Task, String, String](LettuceConfig("redis://localhost"))
value <- Resource.liftF(Task.effect(connection.sync().get("key")))
} yield value
```