From 869c31a51a30586558fbe14270f95eb35e358189 Mon Sep 17 00:00:00 2001 From: Andrew Tang Date: Tue, 27 Jun 2023 10:01:43 -0700 Subject: [PATCH 1/2] add string support for count distinct column --- .../feathr/offline/swa/SlidingWindowFeatureUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala index 91a6d36cc..42b24bcc2 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala @@ -186,7 +186,9 @@ private[offline] object SlidingWindowFeatureUtils { // In Feathr's use case, we want to treat the count aggregation as simple count of non-null items. val rewrittenDef = s"CASE WHEN ${featureDef} IS NOT NULL THEN 1 ELSE 0 END" new CountAggregate(rewrittenDef) - case AggregationType.COUNT_DISTINCT => new CountDistinctAggregate(featureDef) + case AggregationType.COUNT_DISTINCT => + var rewrittenDef = s"CASE WHEN ${featureDef} IS NOT NULL THEN CAST(CONV(MD5(${featureDef}), 16, 10) AS INT) ELSE 0 END" + new CountDistinctAggregate(rewrittenDef) case AggregationType.AVG => new AvgAggregate(featureDef) case AggregationType.MAX => new MaxAggregate(featureDef) case AggregationType.MIN => new MinAggregate(featureDef) From ff9bd8d363b317674e78000a1b7f44f7282ca6de Mon Sep 17 00:00:00 2001 From: Andrew Tang Date: Tue, 27 Jun 2023 23:23:46 -0700 Subject: [PATCH 2/2] fix: use native hashing algorithm and fix bug --- .../linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala index 42b24bcc2..0ca62dbf6 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala @@ -187,7 +187,7 @@ private[offline] object SlidingWindowFeatureUtils { val rewrittenDef = s"CASE WHEN ${featureDef} IS NOT NULL THEN 1 ELSE 0 END" new CountAggregate(rewrittenDef) case AggregationType.COUNT_DISTINCT => - var rewrittenDef = s"CASE WHEN ${featureDef} IS NOT NULL THEN CAST(CONV(MD5(${featureDef}), 16, 10) AS INT) ELSE 0 END" + val rewrittenDef = s"CASE WHEN ${featureDef} IS NOT NULL THEN hash(${featureDef}) ELSE 0 END" new CountDistinctAggregate(rewrittenDef) case AggregationType.AVG => new AvgAggregate(featureDef) case AggregationType.MAX => new MaxAggregate(featureDef)