Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "2.4.2"
version = "2.5.1"

maxColumn = 140
assumeStandardLibraryStripMargin = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,20 @@ sealed abstract class ConsistencyLevel {

import ConsistencyLevel._

def toStringRepr: String = this match {
case Any => "ANY"
case One => "ONE"
case Two => "TWO"
case Three => "THREE"
case Quorum => "QUORUM"
case All => "ALL"
case LocalOne => "LOCAL_ONE"
case LocalQuorum => "LOCAL_QUORUM"
case EachQuorum => "EACH_QUORUM "
case Serial => "SERIAL"
case LocalSerial => "LOCAL_SERIAL"
}
def toStringRepr: String =
this match {
case Any => "ANY"
case One => "ONE"
case Two => "TWO"
case Three => "THREE"
case Quorum => "QUORUM"
case All => "ALL"
case LocalOne => "LOCAL_ONE"
case LocalQuorum => "LOCAL_QUORUM"
case EachQuorum => "EACH_QUORUM "
case Serial => "SERIAL"
case LocalSerial => "LOCAL_SERIAL"
}

}

Expand Down
18 changes: 10 additions & 8 deletions example/src/main/scala/com/avast/sst/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ object Main extends ZioServerApp {
meterRegistry <- MicrometerJmxModule.make[Task](configuration.jmx)
_ <- Resource.liftF(MicrometerJvmModule.make[Task](meterRegistry))
serverMetricsModule <- Resource.liftF(MicrometerHttp4sServerMetricsModule.make[Task](meterRegistry, clock))
boundedConnectExecutionContext <- executorModule
.makeThreadPoolExecutor(
configuration.boundedConnectExecutor,
new ConfigurableThreadFactory(Config(Some("hikari-connect-%02d")))
)
.map(ExecutionContext.fromExecutorService)
boundedConnectExecutionContext <-
executorModule
.makeThreadPoolExecutor(
configuration.boundedConnectExecutor,
new ConfigurableThreadFactory(Config(Some("hikari-connect-%02d")))
)
.map(ExecutionContext.fromExecutorService)
hikariMetricsFactory = new MicrometerMetricsTrackerFactory(meterRegistry)
doobieTransactor <- DoobieHikariModule
.make[Task](configuration.database, boundedConnectExecutionContext, executorModule.blocker, Some(hikariMetricsFactory))
doobieTransactor <-
DoobieHikariModule
.make[Task](configuration.database, boundedConnectExecutionContext, executorModule.blocker, Some(hikariMetricsFactory))
randomService = RandomService(doobieTransactor)
httpClient <- Http4sBlazeClientModule.make[Task](configuration.client, executorModule.executionContext)
circuitBreakerMetrics <- Resource.liftF(MicrometerCircuitBreakerMetricsModule.make[Task]("test-http-client", meterRegistry))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ trait RandomService {

object RandomService {

def apply(transactor: Transactor[Task]): RandomService = new RandomService {
override def randomNumber: Task[Double] = {
sql"select random()"
.query[Double]
.unique
.transact(transactor)
def apply(transactor: Transactor[Task]): RandomService =
new RandomService {
override def randomNumber: Task[Double] = {
sql"select random()"
.query[Double]
.unique
.transact(transactor)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ class MonitoringServerInterceptor(meterRegistry: MeterRegistry) extends ServerIn
}

private def makeGauge(name: String): AtomicLong = {
gaugeCache.computeIfAbsent(name, n => {
val counter = new AtomicLong()
meterRegistry.gauge(n, counter)
counter
})
gaugeCache.computeIfAbsent(
name,
n => {
val counter = new AtomicLong()
meterRegistry.gauge(n, counter)
counter
}
)
}

private def makeTimer(name: String): Timer = timerCache.computeIfAbsent(name, meterRegistry.timer(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,23 @@ object Http4sBlazeServerModule {
InetSocketAddress.createUnresolved(config.listenAddress, config.listenPort)
)
)
server <- BlazeServerBuilder[F](executionContext)
.bindSocketAddress(inetSocketAddress)
.withHttpApp(httpApp)
.withoutBanner
.withNio2(config.nio2Enabled)
.withWebSockets(config.webSocketsEnabled)
.enableHttp2(config.http2Enabled)
.withResponseHeaderTimeout(Duration.fromNanos(config.responseHeaderTimeout.toNanos))
.withIdleTimeout(Duration.fromNanos(config.idleTimeout.toNanos))
.withBufferSize(config.bufferSize)
.withMaxRequestLineLength(config.maxRequestLineLength)
.withMaxHeadersLength(config.maxHeadersLength)
.withChunkBufferMaxSize(config.chunkBufferMaxSize)
.withConnectorPoolSize(config.connectorPoolSize)
.withChannelOption[java.lang.Boolean](StandardSocketOptions.TCP_NODELAY, config.socketOptions.tcpNoDelay)
.resource
server <-
BlazeServerBuilder[F](executionContext)
.bindSocketAddress(inetSocketAddress)
.withHttpApp(httpApp)
.withoutBanner
.withNio2(config.nio2Enabled)
.withWebSockets(config.webSocketsEnabled)
.enableHttp2(config.http2Enabled)
.withResponseHeaderTimeout(Duration.fromNanos(config.responseHeaderTimeout.toNanos))
.withIdleTimeout(Duration.fromNanos(config.idleTimeout.toNanos))
.withBufferSize(config.bufferSize)
.withMaxRequestLineLength(config.maxRequestLineLength)
.withMaxHeadersLength(config.maxHeadersLength)
.withChunkBufferMaxSize(config.chunkBufferMaxSize)
.withConnectorPoolSize(config.connectorPoolSize)
.withChannelOption[java.lang.Boolean](StandardSocketOptions.TCP_NODELAY, config.socketOptions.tcpNoDelay)
.resource
} yield server
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ class RouteMetrics[F[_]: Sync](meterRegistry: MeterRegistry, clock: Clock[F]) {
val httpStatusCodes = new HttpStatusMetrics(prefix, meterRegistry)
for {
start <- clock.monotonic(TimeUnit.NANOSECONDS)
response <- F
.delay(activeRequests.increment())
.bracket { _ => route.flatTap(response => F.delay(httpStatusCodes.recordHttpStatus(response.status))) } { _ =>
for {
time <- computeTime(start)
_ <- F.delay(activeRequests.increment(-1))
_ <- F.delay(timer.record(time, TimeUnit.NANOSECONDS))
} yield ()
}
response <-
F.delay(activeRequests.increment())
.bracket { _ => route.flatTap(response => F.delay(httpStatusCodes.recordHttpStatus(response.status))) } { _ =>
for {
time <- computeTime(start)
_ <- F.delay(activeRequests.increment(-1))
_ <- F.delay(timer.record(time, TimeUnit.NANOSECONDS))
} yield ()
}
} yield response
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@ class CorrelationIdMiddleware[F[_]: Sync](

private val F = Sync[F]

def wrap(routes: HttpRoutes[F]): HttpRoutes[F] = Kleisli[OptionT[F, *], Request[F], Response[F]] { request =>
request.headers.get(correlationIdHeaderName) match {
case Some(header) =>
val requestWithAttribute = request.withAttribute(attributeKey, CorrelationId(header.value))
routes(requestWithAttribute).map(r => r.withHeaders(r.headers.put(header)))
case None =>
for {
newCorrelationId <- OptionT.liftF(F.delay(generator()))
_ <- log(newCorrelationId)
requestWithAttribute = request.withAttribute(attributeKey, CorrelationId(newCorrelationId))
response <- routes(requestWithAttribute)
} yield response.withHeaders(response.headers.put(Header(correlationIdHeaderName.value, newCorrelationId)))
def wrap(routes: HttpRoutes[F]): HttpRoutes[F] =
Kleisli[OptionT[F, *], Request[F], Response[F]] { request =>
request.headers.get(correlationIdHeaderName) match {
case Some(header) =>
val requestWithAttribute = request.withAttribute(attributeKey, CorrelationId(header.value))
routes(requestWithAttribute).map(r => r.withHeaders(r.headers.put(header)))
case None =>
for {
newCorrelationId <- OptionT.liftF(F.delay(generator()))
_ <- log(newCorrelationId)
requestWithAttribute = request.withAttribute(attributeKey, CorrelationId(newCorrelationId))
response <- routes(requestWithAttribute)
} yield response.withHeaders(response.headers.put(Header(correlationIdHeaderName.value, newCorrelationId)))
}
}
}

def retrieveCorrelationId(request: Request[F]): Option[CorrelationId] = request.attributes.lookup(attributeKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ final case class ForkJoinPoolConfig(
math.min(math.max(math.ceil(numOfCpus * parallelismFactor).toInt, parallelismMin), parallelismMax)
}

private[sst] def computeAsyncMode: Boolean = taskPeekingMode match {
case FIFO => true
case LIFO => false
}
private[sst] def computeAsyncMode: Boolean =
taskPeekingMode match {
case FIFO => true
case LIFO => false
}

}

Expand Down
26 changes: 15 additions & 11 deletions jvm/src/main/scala/com/avast/sst/jvm/system/console/Console.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@ trait Console[F[_]] {

object Console {

def apply[F[_]: Sync](in: Reader, out: OutputStream, err: OutputStream): Console[F] = new Console[F] {
def apply[F[_]: Sync](in: Reader, out: OutputStream, err: OutputStream): Console[F] =
new Console[F] {

private val F = Sync[F]
private val F = Sync[F]

override def printLine(value: String): F[Unit] = F.delay {
SConsole.withOut(out)(SConsole.println(value))
}
override def printLine(value: String): F[Unit] =
F.delay {
SConsole.withOut(out)(SConsole.println(value))
}

override def printLineToError(value: String): F[Unit] = F.delay {
SConsole.withErr(err)(SConsole.err.println(value))
}
override def printLineToError(value: String): F[Unit] =
F.delay {
SConsole.withErr(err)(SConsole.err.println(value))
}

override def readLine: F[String] = F.delay {
SConsole.withIn(in)(StdIn.readLine())
override def readLine: F[String] =
F.delay {
SConsole.withIn(in)(StdIn.readLine())
}
}
}

}
23 changes: 12 additions & 11 deletions jvm/src/main/scala/com/avast/sst/jvm/system/random/Random.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@ trait Random[F[_]] {

object Random {

def apply[F[_]: Sync](rnd: scala.util.Random): Random[F] = new Random[F] {
def apply[F[_]: Sync](rnd: scala.util.Random): Random[F] =
new Random[F] {

private val F = Sync[F]
private val F = Sync[F]

override def nextDouble: F[Double] = F.delay(rnd.nextDouble())
override def nextDouble: F[Double] = F.delay(rnd.nextDouble())

override def nextBoolean: F[Boolean] = F.delay(rnd.nextBoolean())
override def nextBoolean: F[Boolean] = F.delay(rnd.nextBoolean())

override def nextFloat: F[Float] = F.delay(rnd.nextFloat())
override def nextFloat: F[Float] = F.delay(rnd.nextFloat())

override def nextInt: F[Int] = F.delay(rnd.nextInt())
override def nextInt: F[Int] = F.delay(rnd.nextInt())

override def nextInt(n: Int): F[Int] = F.delay(rnd.nextInt(n))
override def nextInt(n: Int): F[Int] = F.delay(rnd.nextInt(n))

override def nextLong: F[Long] = F.delay(rnd.nextLong())
override def nextLong: F[Long] = F.delay(rnd.nextLong())

override def nextPrintableChar: F[Char] = F.delay(rnd.nextPrintableChar())
override def nextPrintableChar: F[Char] = F.delay(rnd.nextPrintableChar())

override def nextString(length: Int): F[String] = F.delay(rnd.nextString(length))
}
override def nextString(length: Int): F[String] = F.delay(rnd.nextString(length))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,28 @@ private[jmx] class TypeScopeNameObjectNameFactory(separator: String = ".") exten
parsedName.getOrElse(defaultFactory.createName(`type`, domain, name))
}

private def parseName(domain: String, name: String) = Either.catchNonFatal {
val parts = name.split(quotedSeparator, partNames.length)
private def parseName(domain: String, name: String) =
Either.catchNonFatal {
val parts = name.split(quotedSeparator, partNames.length)

/* The following block of code is a little hack. The problem is that ObjectName requires HashTable as parameter but HashTable
/* The following block of code is a little hack. The problem is that ObjectName requires HashTable as parameter but HashTable
is unsorted and thus unusable for us. We hack it by raping the HashTable and in-fact using LinkedHashMap which is
much more suitable for our needs. */
val map = new java.util.LinkedHashMap[String, String](parts.length)
val properties = new java.util.Hashtable[String, String](parts.length) {
override def entrySet(): util.Set[util.Map.Entry[String, String]] = map.entrySet()
val map = new java.util.LinkedHashMap[String, String](parts.length)
val properties = new java.util.Hashtable[String, String](parts.length) {
override def entrySet(): util.Set[util.Map.Entry[String, String]] = map.entrySet()
}

parts.zip(partNames).foreach {
case (part, partName) =>
val quoted = quote(part)
properties.put(partName, quoted)
map.put(partName, quoted)
}

new ObjectName(domain, properties)
}

parts.zip(partNames).foreach {
case (part, partName) =>
val quoted = quote(part)
properties.put(partName, quoted)
map.put(partName, quoted)
}

new ObjectName(domain, properties)
}

private def quote(objectName: String) = objectName.replaceAll("[\\Q.?*\"\\E]", "_")

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ object CircuitBreakerMetrics {

}

def noop[F[_]](implicit F: Applicative[F]): CircuitBreakerMetrics[F] = new CircuitBreakerMetrics[F] {
override def increaseRejected: F[Unit] = F.unit
override def setState(state: State): F[Unit] = F.unit
}
def noop[F[_]](implicit F: Applicative[F]): CircuitBreakerMetrics[F] =
new CircuitBreakerMetrics[F] {
override def increaseRejected: F[Unit] = F.unit
override def setState(state: State): F[Unit] = F.unit
}

}
9 changes: 5 additions & 4 deletions ssl-config/src/main/scala/com/avast/sst/ssl/Slf4jLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ private class Slf4jLogger(l: Logger) extends NoDepsLogger {

private[ssl] object Slf4jLogger {

def factory: LoggerFactory = new LoggerFactory {
override def apply(clazz: Class[_]): NoDepsLogger = new Slf4jLogger(Slf4jLoggerFactory.getLogger(clazz))
override def apply(name: String): NoDepsLogger = new Slf4jLogger(Slf4jLoggerFactory.getLogger(name))
}
def factory: LoggerFactory =
new LoggerFactory {
override def apply(clazz: Class[_]): NoDepsLogger = new Slf4jLogger(Slf4jLoggerFactory.getLogger(clazz))
override def apply(name: String): NoDepsLogger = new Slf4jLogger(Slf4jLoggerFactory.getLogger(name))
}

}
Loading