Skip to content

Commit

Permalink
Refresh redis topology on write (#83)
Browse files Browse the repository at this point in the history
* Refresh redis topology on write

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* Add comment

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>
  • Loading branch information
khorshuheng committed Jul 28, 2021
1 parent f4fcc0b commit 756dc57
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package feast.ingestion.stores.redis

import java.util

import com.google.protobuf.Timestamp
import com.google.protobuf.util.Timestamps
import com.redislabs.provider.redis.util.PipelineUtils.{foreachWithPipeline, mapWithPipeline}
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisEndpoint, RedisNode}
import feast.ingestion.utils.TypeConversion
import feast.proto.storage.RedisProto.RedisKeyV2
import feast.proto.types.ValueProto
import org.apache.spark.SparkEnv
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.metrics.source.RedisSinkMetricSource
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
Expand All @@ -51,22 +50,14 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

import RedisSinkRelation._

private implicit val redisConfig: RedisConfig = {
new RedisConfig(
new RedisEndpoint(sqlContext.sparkContext.getConf)
)
}

private implicit val readWriteConfig: ReadWriteConfig = {
ReadWriteConfig.fromSparkConf(sqlContext.sparkContext.getConf)
}

override def schema: StructType = ???

val MAX_EXPIRED_TIMESTAMP = new java.sql.Timestamp(Timestamps.MAX_VALUE.getSeconds * 1000)

val persistence: Persistence = new HashTypePersistence(config)

val sparkConf: SparkConf = sqlContext.sparkContext.getConf

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
// repartition for deduplication
val dataToStore =
Expand All @@ -77,6 +68,17 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
else data

dataToStore.foreachPartition { partition: Iterator[Row] =>
// refresh redis cluster topology for each batch
implicit val redisConfig: RedisConfig = {
new RedisConfig(
new RedisEndpoint(sparkConf)
)
}

implicit val readWriteConfig: ReadWriteConfig = {
ReadWriteConfig.fromSparkConf(sparkConf)
}

// grouped iterator to only allocate memory for a portion of rows
partition.grouped(config.iteratorGroupingSize).foreach { batch =>
// group by key and keep only latest row per each key
Expand Down

0 comments on commit 756dc57

Please sign in to comment.