Skip to content

Commit

Permalink
Hash entity keys (#108)
Browse files Browse the repository at this point in the history
* Hash entity keys

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* fix test

Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>

* update e2e test image

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* fix helm set command

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* fix helm repository

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Feb 14, 2022
1 parent 0f4e27c commit 73b09f6
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 67 deletions.
2 changes: 2 additions & 0 deletions infra/scripts/k8s-common-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ function helm_install {
if ! time helm install --wait "${FEAST_RELEASE_NAME:-feast-release}" feast-charts/feast "$@" \
--timeout 10m \
--set "prometheus-statsd-exporter.enabled=false" \
--set "feast-online-serving.image.repository=gcr.io/kf-feast/feast-serving/feast-serving" \
--set "feast-online-serving.image.tag=0.26.11-alpha" \
--set "feast-jobservice.enabled=false" \
--set "prometheus.enabled=false" \
--set "grafana.enabled=false" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import java.util
import com.google.protobuf.Timestamp
import com.google.protobuf.util.Timestamps
import feast.ingestion.utils.TypeConversion
import feast.proto.storage.RedisProto.RedisKeyV2
import feast.proto.types.ValueProto
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.metrics.source.RedisSinkMetricSource
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -83,13 +82,13 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
// grouped iterator to only allocate memory for a portion of rows
partition.grouped(config.iteratorGroupingSize).foreach { batch =>
// group by key and keep only latest row per each key
val rowsWithKey: Map[RedisKeyV2, Row] =
val rowsWithKey: Map[String, Row] =
compactRowsToLatestTimestamp(batch.map(row => dataKeyId(row) -> row)).toMap

val keys = rowsWithKey.keysIterator.toList
val readPipeline = pipelineProvider.pipeline()
val readResponses =
keys.map(key => persistence.get(readPipeline, key.toByteArray))
keys.map(key => persistence.get(readPipeline, key.getBytes()))
readPipeline.close()
val storedValues = readResponses.map(_.get())
val timestamps = storedValues.map(persistence.storedTimestamp)
Expand Down Expand Up @@ -117,7 +116,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
}
persistence.save(
writePipeline,
key.toByteArray,
key.getBytes(),
row,
expiryTimestampByKey(key),
MAX_EXPIRED_TIMESTAMP
Expand All @@ -130,30 +129,26 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
dataToStore.unpersist()
}

private def compactRowsToLatestTimestamp(rows: Seq[(RedisKeyV2, Row)]) = rows
private def compactRowsToLatestTimestamp(rows: Seq[(String, Row)]) = rows
.groupBy(_._1)
.values
.map(_.maxBy(_._2.getAs[java.sql.Timestamp](config.timestampColumn).getTime))

/**
* Key is built from entities columns values with prefix of entities columns names.
*/
private def dataKeyId(row: Row): RedisKeyV2 = {
private def dataKeyId(row: Row): String = {
val types = row.schema.fields.map(f => (f.name, f.dataType)).toMap

val sortedEntities = config.entityColumns.sorted.toSeq
val entityValues = sortedEntities
.map(entity => (row.getAs[Any](entity), types(entity)))
.map { case (value, v_type) =>
TypeConversion.sqlTypeToProtoValue(value, v_type).asInstanceOf[ValueProto.Value]
TypeConversion.sqlTypeToString(value, v_type)
}

RedisKeyV2
.newBuilder()
.setProject(config.projectName)
.addAllEntityNames(sortedEntities.asJava)
.addAllEntityValues(entityValues.asJava)
.build
DigestUtils.md5Hex(
s"${config.projectName}#${sortedEntities.mkString("#")}:${entityValues.mkString("#")}"
)
}

private lazy val metricSource: Option[RedisSinkMetricSource] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ object TypeConversion {
}).build
}

def sqlTypeToString(value: Any, `type`: DataType): String = {
`type` match {
case IntegerType => value.asInstanceOf[Int].toString
case LongType => value.asInstanceOf[Long].toString
case StringType => value.asInstanceOf[String]
}
}

class AsScala[A](op: => A) {
def asScala: A = op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package feast.ingestion

import java.nio.file.Paths
import java.sql.Timestamp

import collection.JavaConverters._
import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import com.google.protobuf.util.Timestamps
Expand All @@ -32,11 +31,16 @@ import feast.ingestion.helpers.RedisStorageHelper._
import feast.ingestion.helpers.DataHelper._
import feast.ingestion.helpers.TestRow
import feast.ingestion.metrics.StatsDStub
import feast.ingestion.utils.TypeConversion
import feast.proto.storage.RedisProto.RedisKeyV2
import feast.proto.types.ValueProto
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

import java.time.Instant
import java.time.temporal.ChronoUnit

class BatchPipelineIT extends SparkSpec with ForAllTestContainer {

val password = "password"
Expand All @@ -62,14 +66,13 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {

implicit def testRowEncoder: Encoder[TestRow] = ExpressionEncoder()

def getCurrentTimeToSecondPrecision: Long =
Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli

def encodeEntityKey(row: TestRow, featureTable: FeatureTable): Array[Byte] = {
RedisKeyV2
.newBuilder()
.setProject(featureTable.project)
.addAllEntityNames(featureTable.entities.map(_.name).sorted.asJava)
.addEntityValues(ValueProto.Value.newBuilder().setStringVal(row.customer))
.build
.toByteArray
val entities = featureTable.entities.map(_.name).mkString("#")
val key = DigestUtils.md5Hex(s"${featureTable.project}#${entities}:${row.customer}")
key.getBytes()
}

def groupByEntity(row: TestRow) =
Expand Down Expand Up @@ -143,7 +146,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
endTime = endDate
)

val ingestionTimeUnix = System.currentTimeMillis()
val ingestionTimeUnix = getCurrentTimeToSecondPrecision
BatchPipeline.createPipeline(sparkSession, configWithMaxAge)

val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable)
Expand All @@ -161,9 +164,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
"_ex:test-fs" -> expectedExpiryTimestamp
)
)
val toleranceMs = 10
val keyTTL = jedis.ttl(encodedEntityKey)
keyTTL should (be <= (expectedExpiryTimestamp.getTime - ingestionTimeUnix + toleranceMs) / 1000 and be > 0L)
val keyTTL = jedis.ttl(encodedEntityKey)
keyTTL should (be <= (expectedExpiryTimestamp.getTime - ingestionTimeUnix) / 1000 and be > 0L)

})

Expand All @@ -178,7 +180,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
endTime = endDate
)

val secondIngestionTimeUnix = System.currentTimeMillis()
val secondIngestionTimeUnix = getCurrentTimeToSecondPrecision
BatchPipeline.createPipeline(sparkSession, configWithSecondFeatureTable)

val featureKeyEncoderSecondTable: String => String =
Expand All @@ -203,9 +205,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
"_ex:test-fs-2" -> expectedExpiryTimestamp2
)
)
val keyTTL = jedis.ttl(encodedEntityKey).toLong
val toleranceMs = 10
keyTTL should (be <= (expectedExpiryTimestamp2.getTime - secondIngestionTimeUnix + toleranceMs) / 1000 and be > (expectedExpiryTimestamp1.getTime - secondIngestionTimeUnix) / 1000)
val keyTTL = jedis.ttl(encodedEntityKey)
keyTTL should (be <= (expectedExpiryTimestamp2.getTime - secondIngestionTimeUnix) and be > (expectedExpiryTimestamp1.getTime - secondIngestionTimeUnix) / 1000)

})
}
Expand All @@ -225,7 +226,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
endTime = endDate
)

val ingestionTimeUnix = System.currentTimeMillis()
val ingestionTimeUnix = getCurrentTimeToSecondPrecision
BatchPipeline.createPipeline(sparkSession, configWithMaxAge)

val reducedMaxAge = 86400 * 2
Expand Down Expand Up @@ -264,9 +265,9 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
"_ex:test-fs-2" -> expectedExpiryTimestamp2
)
)
val keyTTL = jedis.ttl(encodedEntityKey).toLong
val keyTTL = jedis.ttl(encodedEntityKey)
val toleranceMs = 10
keyTTL should (be <= (expectedExpiryTimestamp1.getTime - ingestionTimeUnix + toleranceMs) / 1000 and
keyTTL should (be <= (expectedExpiryTimestamp1.getTime - ingestionTimeUnix) / 1000 and
be > (expectedExpiryTimestamp2.getTime - ingestionTimeUnix) / 1000)

})
Expand All @@ -286,7 +287,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
endTime = endDate
)

val ingestionTimeUnix = System.currentTimeMillis()
val ingestionTimeUnix = getCurrentTimeToSecondPrecision
BatchPipeline.createPipeline(sparkSession, configWithMaxAge)

val reducedMaxAge = 86400 * 2
Expand Down Expand Up @@ -316,7 +317,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
"_ex:test-fs" -> expiryTimestampAfterUpdate
)
)
val keyTTL = jedis.ttl(encodedEntityKey).toLong
val keyTTL = jedis.ttl(encodedEntityKey)
keyTTL should (be <= (expiryTimestampAfterUpdate.getTime - ingestionTimeUnix) / 1000 and be > 0L)

})
Expand Down
10 changes: 2 additions & 8 deletions spark/ingestion/src/test/scala/feast/ingestion/PandasUDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package feast.ingestion
import java.nio.file.Paths
import java.sql.Timestamp
import java.util.Date

import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import feast.ingestion.helpers.DataHelper.generateTempPath
import feast.ingestion.utils.testing.MemoryStreamingSource
import feast.proto.storage.RedisProto.RedisKeyV2
import feast.proto.types.ValueProto
import feast.proto.types.ValueProto.ValueType
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.SparkConf
import org.apache.spark.api.python.DynamicPythonFunction
import org.apache.spark.sql.{Encoder, Row, SQLContext}
Expand Down Expand Up @@ -59,13 +59,7 @@ class PandasUDF extends SparkSpec with ForAllTestContainer {
val rand = new Random()

def encodeEntityKey(key: String): Array[Byte] =
RedisKeyV2
.newBuilder()
.setProject("default")
.addAllEntityNames(Seq("key").asJava)
.addEntityValues(ValueProto.Value.newBuilder().setStringVal(key))
.build
.toByteArray
DigestUtils.md5Hex(s"default#key:${key}").getBytes()

// Function checks that num between 0 and 10 and num2 between 0 and 20
// See the code test/resources/python/udf.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package feast.ingestion

import java.nio.file.Paths
import java.util.Properties

import com.dimafeng.testcontainers.{
ForAllTestContainer,
GenericContainer,
Expand All @@ -41,6 +40,7 @@ import feast.ingestion.helpers.DataHelper._
import feast.ingestion.helpers.TestRow
import feast.proto.storage.RedisProto.RedisKeyV2
import feast.proto.types.ValueProto
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down Expand Up @@ -95,14 +95,11 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
)

def encodeEntityKey(row: TestMessage, featureTable: FeatureTable): Array[Byte] = {
RedisKeyV2
.newBuilder()
.setProject(featureTable.project)
.addAllEntityNames(featureTable.entities.map(_.name).sorted.asJava)
.addEntityValues(ValueProto.Value.newBuilder().setInt64Val(row.getS2Id))
.addEntityValues(ValueProto.Value.newBuilder().setStringVal(row.getVehicleType.toString))
.build
.toByteArray
val entities = featureTable.entities.map(_.name).mkString("#")
val key = DigestUtils.md5Hex(
s"${featureTable.project}#${entities}:${row.getS2Id.toString}#${row.getVehicleType.toString}"
)
key.getBytes()
}

def groupByEntity(row: TestMessage) =
Expand Down Expand Up @@ -329,13 +326,8 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
query.processAllAvailable()

val allTypesKeyEncoder: String => String = encodeFeatureKey(configWithKafka.featureTable)
val redisKey = RedisKeyV2
.newBuilder()
.setProject("default")
.addEntityNames("string")
.addEntityValues(ValueProto.Value.newBuilder().setStringVal("test"))
.build()
val storedValues = jedis.hgetAll(redisKey.toByteArray).asScala.toMap
val redisKey = DigestUtils.md5Hex(s"default#string:test").getBytes()
val storedValues = jedis.hgetAll(redisKey).asScala.toMap
storedValues should beStoredRow(
Map(
allTypesKeyEncoder("double") -> 1,
Expand Down Expand Up @@ -432,14 +424,9 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {

query.processAllAvailable()

val redisKey = RedisKeyV2
.newBuilder()
.setProject("default")
.addEntityNames("customer")
.addEntityValues(ValueProto.Value.newBuilder().setStringVal("aaa"))
.build()
val redisKey = DigestUtils.md5Hex(s"default#customer:aaa").getBytes()

val storedValues = jedis.hgetAll(redisKey.toByteArray).asScala.toMap
val storedValues = jedis.hgetAll(redisKey).asScala.toMap
val customFeatureKeyEncoder: String => String = encodeFeatureKey(avroConfig.featureTable)
storedValues should beStoredRow(
Map(
Expand Down

0 comments on commit 73b09f6

Please sign in to comment.