diff --git a/build.sbt b/build.sbt index ae0767a..0bfa1a9 100644 --- a/build.sbt +++ b/build.sbt @@ -33,7 +33,8 @@ val V = new { val http4s = "0.20.0-M5" val enumeratum = "1.5.13" val typesafeConfig = "1.3.3" - val guava = "27.0.1-jre" +// val guava = "27.0.1-jre" + val guava = "24.1-jre" val squants = "1.3.0" val antlr4 = "4.7.1" } diff --git a/src/main/scala/io/epiphanous/flinkrunner/operator/EnrichmentAsyncFunction.scala b/src/main/scala/io/epiphanous/flinkrunner/operator/EnrichmentAsyncFunction.scala index 7992ab7..2152978 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/operator/EnrichmentAsyncFunction.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/operator/EnrichmentAsyncFunction.scala @@ -1,6 +1,6 @@ package io.epiphanous.flinkrunner.operator -import java.util.concurrent.Executors +import java.util.concurrent.{Executors, TimeUnit} import cats.effect.{ContextShift, IO, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader} @@ -107,11 +107,13 @@ abstract class EnrichmentAsyncFunction[IN, OUT, CV <: AnyRef]( @transient lazy val cache = { + val expireAfter = config.getDuration(s"$configPrefix.cache.expire.after") val builder = CacheBuilder .newBuilder() .concurrencyLevel(config.getInt(s"$configPrefix.cache.concurrency.level")) .maximumSize(config.getInt(s"$configPrefix.cache.max.size")) - .expireAfterWrite(config.getDuration(s"$configPrefix.cache.expire.after")) + .expireAfterWrite(expireAfter.toMillis, TimeUnit.MILLISECONDS) +// .expireAfterWrite(expireAfter) // for guava 27 if (!config.getBoolean(s"$configPrefix.cache.use.strong.keys")) builder.weakKeys() if (config.getBoolean(s"$configPrefix.cache.record.stats"))