Skip to content

Commit

Permalink
Add Redis write properties (#125)
Browse files Browse the repository at this point in the history
* Add Redis write properties

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix pandas udf test

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Mar 28, 2022
1 parent d71c460 commit ed7e74f
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ object BasePipeline {
val conf = new SparkConf()

jobConfig.store match {
case RedisConfig(host, port, password, ssl) if password.isEmpty =>
conf
.set("spark.redis.host", host)
.set("spark.redis.port", port.toString)
.set("spark.redis.ssl", ssl.toString)
case RedisConfig(host, port, password, ssl) if password.nonEmpty =>
case RedisConfig(host, port, password, ssl, properties) =>
conf
.set("spark.redis.host", host)
.set("spark.redis.port", port.toString)
.set("spark.redis.password", password)
.set("spark.redis.ssl", ssl.toString)
.set("spark.redis.properties.maxJitter", properties.maxJitterSeconds.toString)
.set("spark.redis.properties.pipelineSize", properties.pipelineSize.toString)
case BigTableConfig(projectId, instanceId) =>
conf
.set("spark.bigtable.projectId", projectId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ object Modes extends Enumeration {

abstract class StoreConfig

case class RedisConfig(host: String, port: Int, password: String, ssl: Boolean) extends StoreConfig
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class RedisConfig(
host: String,
port: Int,
password: String = "",
ssl: Boolean = false,
properties: RedisWriteProperties = RedisWriteProperties()
) extends StoreConfig
case class RedisWriteProperties(maxJitterSeconds: Int = 3600, pipelineSize: Int = 250)
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class CassandraConfig(
connection: CassandraConnection,
keyspace: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package feast.ingestion.stores.redis

import java.{sql, util}
import com.google.protobuf.Timestamp
import feast.ingestion.RedisWriteProperties
import feast.ingestion.utils.TypeConversion
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.{SparkConf, SparkEnv}
Expand Down Expand Up @@ -82,6 +83,10 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
port = sparkConf.get("spark.redis.port").toInt,
password = sparkConf.get("spark.redis.password", "")
)
val properties = RedisWriteProperties(
maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt,
pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt
)

val isClusterMode = checkIfInClusterMode(endpoint)

Expand All @@ -96,7 +101,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
}

// grouped iterator to only allocate memory for a portion of rows
partition.grouped(config.iteratorGroupingSize).foreach { batch =>
partition.grouped(properties.pipelineSize).foreach { batch =>
// group by key and keep only latest row per each key
val rowsWithKey: Map[String, Row] =
compactRowsToLatestTimestamp(batch.map(row => dataKeyId(row) -> row)).toMap
Expand All @@ -112,7 +117,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
val expiryTimestampByKey = keys
.zip(storedValues)
.map { case (key, storedValue) =>
(key, newExpiryTimestamp(rowsWithKey(key), storedValue))
(key, newExpiryTimestamp(rowsWithKey(key), storedValue, properties.maxJitterSeconds))
}
.toMap

Expand Down Expand Up @@ -183,9 +188,14 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
}
}

def applyJitter(expiry: Long, maxJitter: Int): Long = {
if (maxJitter > 0) (scala.util.Random.nextInt(maxJitter).toLong * 1000) + expiry else expiry
}

private def newExpiryTimestamp(
row: Row,
value: util.Map[Array[Byte], Array[Byte]]
value: util.Map[Array[Byte], Array[Byte]],
maxJitterSeconds: Int
): Option[java.sql.Timestamp] = {
val currentMaxExpiry: Option[Long] = value.asScala.toMap
.map { case (key, value) =>
Expand All @@ -205,9 +215,9 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

(currentMaxExpiry, rowExpiry) match {
case (_, None) => None
case (None, Some(expiry)) => Some(new sql.Timestamp(expiry))
case (None, Some(expiry)) => Some(new sql.Timestamp(applyJitter(expiry, maxJitterSeconds)))
case (Some(currentExpiry), Some(newExpiry)) =>
Some(new sql.Timestamp(currentExpiry max newExpiry))
Some(new sql.Timestamp(currentExpiry max applyJitter(newExpiry, maxJitterSeconds)))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ case class SparkRedisConfig(
projectName: String,
entityColumns: Array[String],
timestampColumn: String,
iteratorGroupingSize: Int = 1000,
timestampPrefix: String = "_ts",
repartitionByEntity: Boolean = true,
maxAge: Long = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package feast.ingestion

import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import com.google.protobuf.util.Timestamps
import feast.ingestion.helpers.DataHelper._
import feast.ingestion.helpers.RedisStorageHelper._
import feast.ingestion.helpers.TestRow
Expand All @@ -32,7 +31,6 @@ import org.scalacheck._
import redis.clients.jedis.Jedis

import java.nio.file.Paths
import java.sql.Timestamp
import java.time.Instant
import java.time.temporal.ChronoUnit
import scala.collection.JavaConverters._
Expand All @@ -52,6 +50,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
.set("spark.redis.port", container.mappedPort(6379).toString)
.set("spark.redis.password", password)
.set("spark.metrics.conf.*.sink.statsd.port", statsDStub.port.toString)
.set("spark.redis.properties.maxJitter", "0")
.set("spark.redis.properties.pipelineSize", "250")

trait Scope {
val jedis = new Jedis("localhost", container.mappedPort(6379))
Expand Down Expand Up @@ -84,6 +84,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
Field("feature2", ValueType.Enum.FLOAT)
)
),
store =
RedisConfig("localhost", 6379, properties = RedisWriteProperties(maxJitterSeconds = 0)),
startTime = DateTime.parse("2020-08-01"),
endTime = DateTime.parse("2020-09-01"),
metrics = Some(StatsDConfig(host = "localhost", port = statsDStub.port))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class PandasUDF extends SparkSpec with ForAllTestContainer {
override def withSparkConfOverrides(conf: SparkConf): SparkConf = conf
.set("spark.redis.host", container.host)
.set("spark.redis.port", container.mappedPort(6379).toString)
.set("spark.redis.properties.maxJitter", "0")
.set("spark.redis.properties.pipelineSize", "250")

trait Scope {
implicit def testRowEncoder: Encoder[TestRow] = ExpressionEncoder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
.set("spark.redis.host", redisContainer.host)
.set("spark.redis.port", redisContainer.mappedPort(6379).toString)
.set("spark.sql.streaming.checkpointLocation", generateTempPath("checkpoint"))
.set("spark.redis.properties.maxJitter", "0")
.set("spark.redis.properties.pipelineSize", "250")

trait KafkaPublisher {
val props = new Properties()
Expand Down Expand Up @@ -91,7 +93,9 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
features = Seq(
Field("unique_drivers", ValueType.Enum.INT64)
)
)
),
store =
RedisConfig("localhost", 6379, properties = RedisWriteProperties(maxJitterSeconds = 0))
)

def encodeEntityKey(row: TestMessage, featureTable: FeatureTable): Array[Byte] = {
Expand Down

0 comments on commit ed7e74f

Please sign in to comment.