diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/WindowTimeUnit.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/WindowTimeUnit.scala index 72363cf34..c45953a48 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/WindowTimeUnit.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/WindowTimeUnit.scala @@ -22,6 +22,8 @@ private[offline] object WindowTimeUnit extends Enumeration { case H => Duration.ofHours(timeWindowStr.dropRight(1).trim.toLong) case M => Duration.ofMinutes(timeWindowStr.dropRight(1).trim.toLong) case S => Duration.ofSeconds(timeWindowStr.dropRight(1).trim.toLong) + case Y => Duration.ofDays(365*timeWindowStr.dropRight(1).trim.toLong) + case W => Duration.ofDays(7*timeWindowStr.dropRight(1).trim.toLong) case _ => Duration.ofSeconds(0) } } catch { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/TimeWindowConfigurableAnchorExtractor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/TimeWindowConfigurableAnchorExtractor.scala index 7f2d5bab9..3557a2144 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/TimeWindowConfigurableAnchorExtractor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/TimeWindowConfigurableAnchorExtractor.scala @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrConfigException} import com.linkedin.feathr.offline.config.{ComplexAggregationFeature, TimeWindowFeatureDefinition} import com.linkedin.feathr.offline.generation.aggregations._ -import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.convertFeathrDefToSwjDef +import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.{convertFeathrDefToSwjDef, isBucketedFunction} import com.linkedin.feathr.sparkcommon.SimpleAnchorExtractorSpark import com.linkedin.feathr.swj.aggregate.AggregationType import com.typesafe.config.ConfigFactory @@ -61,7 +61,7 @@ private[offline] class TimeWindowConfigurableAnchorExtractor(@JsonProperty("feat */ override def aggregateAsColumns(groupedDataFrame: DataFrame): Seq[(String, Column)] = { val columnPairs = aggFeatures.collect { - case (featureName, featureDef) if !featureDef.timeWindowFeatureDefinition.aggregationType.toString.startsWith("BUCKETED_") => + case (featureName, featureDef) if !isBucketedFunction(featureDef.timeWindowFeatureDefinition.aggregationType) => // for basic sliding window aggregation // no complex aggregation will be defined if (featureDef.swaFeature.lateralView.isDefined) { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenDefaultsSubstituter.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenDefaultsSubstituter.scala index 3d7764d73..eba1692cb 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenDefaultsSubstituter.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenDefaultsSubstituter.scala @@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, FeatureDataWithJoinKeys} import com.linkedin.feathr.offline.client.DataFrameColName import com.linkedin.feathr.offline.job.FeatureTransformation import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter +import com.linkedin.feathr.offline.util.FeathrUtils import org.apache.spark.sql.SparkSession /** @@ -43,7 +44,12 @@ private[offline] class FeatureGenDefaultsSubstituter() { withDefaultDF, featuresWithKeys.keys.map(FeatureTransformation.FEATURE_NAME_PREFIX + DataFrameColName.getEncodedFeatureRefStrForColName(_)).toSeq) // If there're multiple rows with same join key, keep one record for these duplicate records(same behavior as Feature join API) - val withoutDupDF = withNullsDroppedDF.dropDuplicates(joinKeys) + val dropDuplicate = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.DROP_DUPLICATE_ROWS_FOR_KEYS_IN_FEATURE_GENERATION).toBoolean + val withoutDupDF = if (dropDuplicate) { + withNullsDroppedDF.dropDuplicates(joinKeys) + } else { + withNullsDroppedDF + } // Return features processed in this iteration featuresWithKeys.map(f => (f._1, (FeatureDataFrame(withoutDupDF, inferredTypeConfig), joinKeys))) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenKeyTagAnalyzer.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenKeyTagAnalyzer.scala index f58bf9f57..9dbfe7bd7 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenKeyTagAnalyzer.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenKeyTagAnalyzer.scala @@ -5,7 +5,10 @@ import com.linkedin.feathr.common.{DateTimeParam, DateTimeUtils, JoiningFeatureP import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor import com.linkedin.feathr.offline.job.FeatureGenSpec import com.linkedin.feathr.offline.logical.FeatureGroups +import com.linkedin.feathr.offline.util.FeathrUtils +import org.apache.spark.sql.SparkSession +import java.time.Duration import scala.annotation.tailrec import scala.collection.convert.wrapAll._ @@ -100,15 +103,19 @@ private[offline] object FeatureGenKeyTagAnalyzer extends FeatureGenKeyTagAnalyze featureGenSpec: FeatureGenSpec, featureGroups: FeatureGroups): Seq[JoiningFeatureParams] = { val refTime = featureGenSpec.dateTimeParam + val ss = SparkSession.builder().getOrCreate() + val expand_days = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.EXPAND_DAYS_IN_FEATURE_GENERATION_CUTOFF_TIME).toInt taggedFeature.map(f => { val featureName = f.getFeatureName val featureAnchorWithSource = featureGroups.allAnchoredFeatures(featureName) val dateParam = featureAnchorWithSource.featureAnchor.extractor match { case extractor: TimeWindowConfigurableAnchorExtractor => val aggFeature = extractor.features(featureName) - val dateTimeParam = DateTimeParam.shiftStartTime(refTime, aggFeature.window) - DateTimeUtils.toDateParam(dateTimeParam) - case _ => + val dateTimeShifted = DateTimeParam.shiftStartTime(refTime, aggFeature.window) + val dateTimeParamExpandStart = DateTimeParam.shiftStartTime(dateTimeShifted, Duration.ofDays(expand_days*2)) + val dateTimeParamExpandEnd = DateTimeParam.shiftEndTime(dateTimeParamExpandStart, Duration.ofDays(expand_days).negated()) + DateTimeUtils.toDateParam(dateTimeParamExpandEnd) + case _ => featureGenSpec.dateParam } new JoiningFeatureParams(f.getKeyTag, f.getFeatureName, Option(dateParam)) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala index 0ee870253..03cac99e0 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala @@ -15,6 +15,7 @@ import com.linkedin.feathr.offline.join.DataFrameKeyCombiner import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.{DataSourceAccessor, NonTimeBasedDataSourceAccessor, TimeBasedDataSourceAccessor} import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils +import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.isBucketedFunction import com.linkedin.feathr.offline.transformation.FeatureColumnFormat.FeatureColumnFormat import com.linkedin.feathr.offline.transformation._ import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils.tensorTypeToDataFrameSchema @@ -207,7 +208,7 @@ private[offline] object FeatureTransformation { val featureTypeConfigs = featureAnchorWithSource.featureAnchor.featureTypeConfigs val transformedFeatureData: TransformedResult = featureAnchorWithSource.featureAnchor.extractor match { case transformer: TimeWindowConfigurableAnchorExtractor => - val nonBucketedFeatures = transformer.features.map(_._2.aggregationType).filter(agg => agg == AggregationType.BUCKETED_COUNT_DISTINCT) + val nonBucketedFeatures = transformer.features.map(_._2.aggregationType).filter(agg => isBucketedFunction(agg)) if (!(nonBucketedFeatures.size != transformer.features || transformer.features.isEmpty)) { throw new FeathrFeatureTransformationException( ErrorLabel.FEATHR_USER_ERROR, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala index 908f4be2d..89d397cfa 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala @@ -81,7 +81,9 @@ private[offline] class PathPartitionedTimeSeriesSourceAccessor( throw new FeathrInputDataException( ErrorLabel.FEATHR_USER_ERROR, s"Trying to create TimeSeriesSource but no data " + - s"is found to create source data. Source path: ${source.path}, source type: ${source.sourceType}") + s"is found to create source data. Source path: ${source.path}, source type: ${source.sourceType}." + + s"Try to get dataframe from interval ${timeIntervalOpt}, " + + s"but source dataset time has interval ${datePartitions.map(_.dateInterval.toString).mkString(",")} ") } selectedPartitions } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index 500e570c5..bf188bd09 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -110,7 +110,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, } else { // Throwing exception to avoid dataLoaderHandler hook exception from being suppressed. throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + - s" and retry time of ${retryWaitTime}ms.") + s" and retry time of ${retryWaitTime}ms. Error message: ${e.getMessage}") } } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala index 4946abe1f..91a6d36cc 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala @@ -13,6 +13,7 @@ import com.linkedin.feathr.offline.transformation.FeatureColumnFormat import com.linkedin.feathr.offline.transformation.FeatureColumnFormat.FeatureColumnFormat import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils} +import com.linkedin.feathr.swj.aggregate.AggregationType.AggregationType import com.linkedin.feathr.swj.{FactData, GroupBySpec, LateralViewParams, SlidingWindowFeature, WindowSpec} import com.linkedin.feathr.swj.aggregate.{AggregationType, AvgAggregate, AvgPoolingAggregate, CountAggregate, CountDistinctAggregate, DummyAggregate, LatestAggregate, MaxAggregate, MaxPoolingAggregate, MinAggregate, MinPoolingAggregate, SumAggregate} import org.apache.logging.log4j.LogManager @@ -20,7 +21,6 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.util.sketch.BloomFilter - import java.text.SimpleDateFormat import java.time._ @@ -41,6 +41,14 @@ private[offline] object SlidingWindowFeatureUtils { val DEFAULT_TIME_DELAY = "Default-time-delay" val TIMESTAMP_PARTITION_COLUMN = "__feathr_timestamp_column_from_partition" + /** + * Check if an aggregation function is bucketed + * @param aggregateFunction function type + */ + def isBucketedFunction(aggregateFunction: AggregationType): Boolean = { + aggregateFunction.toString.startsWith("BUCKETED") + } + /** * Check if an anchor contains window aggregate features. * Note: if an anchor contains window aggregate features, it will not contain other non-aggregate features. @@ -187,6 +195,7 @@ private[offline] object SlidingWindowFeatureUtils { case AggregationType.MIN_POOLING => new MinPoolingAggregate(featureDef) case AggregationType.AVG_POOLING => new AvgPoolingAggregate(featureDef) case AggregationType.BUCKETED_COUNT_DISTINCT => new DummyAggregate(featureDef) + case AggregationType.BUCKETED_SUM => new DummyAggregate(featureDef) } swj.SlidingWindowFeature(featureName, aggregationSpec, windowSpec, filter, groupBySpec, lateralViewParams) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/MultiLevelAggregationTransform.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/MultiLevelAggregationTransform.scala index eedaa6450..cfe0920a0 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/MultiLevelAggregationTransform.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/MultiLevelAggregationTransform.scala @@ -1,9 +1,10 @@ package com.linkedin.feathr.offline.transformation -import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.constructTimeStampExpr +import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.{constructTimeStampExpr, isBucketedFunction} import com.linkedin.feathr.offline.util.AnchorUtils.removeNonAlphaNumChars import com.linkedin.feathr.swj.aggregate.AggregationType -import org.apache.spark.sql.expressions.Window +import com.linkedin.feathr.swj.aggregate.AggregationType.AggregationType +import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} @@ -101,13 +102,36 @@ class MultiLevelAggregationTransform(ss: SparkSession, Window.currentRow - 1 ) - val distinctCount = if (useHyperLogLog) { - withKeyColumnsDf.withColumn(outputFeatureColumnName, - approx_count_distinct(expr(featureDefFieldField)).over(windowSpec)) - } else { - withKeyColumnsDf.withColumn(aggregatedItemsSoFarInCurrentBasicBucket, - collect_set(expr(featureDefFieldField)).over(windowSpec)) - .withColumn(outputFeatureColumnName, size(col(aggregatedItemsSoFarInCurrentBasicBucket))) + val withFeatureDf: DataFrame = withOutputFeatures(featureDefFieldField, outputFeatureColumnName, withKeyColumnsDf, keyColumns, windowSpec, aggregateFunction) + + val intermediateCols = Seq(UtcTimestampColumnName, utc_ts_string, oneHourBucketTimestampColumn, utc_min + , roundedMin, basicBucketColumnNameReadable, roundedBasicBucketTimestampColumn, aggregatedItemsSoFarInCurrentBasicBucket, + aggregatedItemsEveryFullBasicBucket, featureValueAtBasicLevelFullBucket, outputFeatureColumnName) ++ newKeyColumnExprs.map(_._2) + (withFeatureDf, intermediateCols) + } + + /** + * produce output feature column with the aggregation function + */ + private def withOutputFeatures(featureDefFieldField: String, + outputFeatureColumnName: String, + withKeyColumnsDf: DataFrame, + keyColumns: Seq[String], + windowSpec: WindowSpec, + aggregateFunction: AggregationType): DataFrame = { + val withOutputFeatures = aggregateFunction match { + case AggregationType.BUCKETED_COUNT_DISTINCT => + if (useHyperLogLog) { + withKeyColumnsDf.withColumn(outputFeatureColumnName, + approx_count_distinct(expr(featureDefFieldField)).over(windowSpec)) + } else { + withKeyColumnsDf.withColumn(aggregatedItemsSoFarInCurrentBasicBucket, + collect_set(expr(featureDefFieldField)).over(windowSpec)) + .withColumn(outputFeatureColumnName, coalesce(size(col(aggregatedItemsSoFarInCurrentBasicBucket)), lit(0))) + } + case AggregationType.BUCKETED_SUM => + withKeyColumnsDf.withColumn(outputFeatureColumnName, + coalesce(sum(expr(featureDefFieldField)).over(windowSpec), lit(0))) } val fullBasicUnitWindowSpec = Window.partitionBy(keyColumns.head, keyColumns.tail: _*) .orderBy(col(roundedBasicBucketTimestampColumn)) @@ -115,21 +139,24 @@ class MultiLevelAggregationTransform(ss: SparkSession, Window.currentRow, Window.currentRow ) - val withFeatureDf = distinctCount.withColumn(aggregatedItemsEveryFullBasicBucket, - collect_set(expr(featureDefFieldField)).over(fullBasicUnitWindowSpec) - ).withColumn(featureValueAtBasicLevelFullBucket, size(col(aggregatedItemsEveryFullBasicBucket))) - val intermediateCols = Seq(UtcTimestampColumnName, utc_ts_string, oneHourBucketTimestampColumn, utc_min - , roundedMin, basicBucketColumnNameReadable, roundedBasicBucketTimestampColumn, aggregatedItemsSoFarInCurrentBasicBucket, - aggregatedItemsEveryFullBasicBucket, featureValueAtBasicLevelFullBucket, outputFeatureColumnName) ++ newKeyColumnExprs.map(_._2) - (withFeatureDf, intermediateCols) + val withFeatureDf = aggregateFunction match { + case AggregationType.BUCKETED_COUNT_DISTINCT => + withOutputFeatures.withColumn(aggregatedItemsEveryFullBasicBucket, + collect_set(expr(featureDefFieldField)).over(fullBasicUnitWindowSpec) + ).withColumn(featureValueAtBasicLevelFullBucket, size(col(aggregatedItemsEveryFullBasicBucket))) + case AggregationType.BUCKETED_SUM => + withOutputFeatures.withColumn(featureValueAtBasicLevelFullBucket, + sum(expr(featureDefFieldField)).over(fullBasicUnitWindowSpec)) + } + withFeatureDf } def multiLevelRollUpAggregate(rollUpLevels: Seq[RollUpLevelVal], - df: DataFrame, - keyColumns: Seq[String], - featureDefAggField: String, - basicLevelAggregationFeatureColumn: String, - aggregationFunction: AggregationType.Value): DataFrame = { + df: DataFrame, + keyColumns: Seq[String], + featureDefAggField: String, + basicLevelAggregationFeatureColumn: String, + aggregationFunction: AggregationType.Value): DataFrame = { val baseFeatureName = "agg_" + featureDefAggField val basicRollupOutputData = if (debug) { RollUpOutputData(df, @@ -189,8 +216,8 @@ class MultiLevelAggregationTransform(ss: SparkSession, if (foundLevels.isEmpty) { throw new RuntimeException(s"Unsupported window ${window}") } - if (aggregateFunction != AggregationType.BUCKETED_COUNT_DISTINCT) { - throw new RuntimeException(s"Unsupported aggregation function ${aggregateFunc}") + if (!isBucketedFunction(aggregateFunction)) { + throw new RuntimeException(s"Unsupported aggregation function ${aggregateFunc}. Only Bucketed functions are supported.") } val highLevel = foundLevels.head @@ -200,7 +227,7 @@ class MultiLevelAggregationTransform(ss: SparkSession, highLevel } - val basicAggFeatureName = "feathr_agg_" + removeNonAlphaNumChars(featureDefAggField) + "_" + basicLevel.name + val basicAggFeatureName = "feathr_agg_" + removeNonAlphaNumChars(featureDefAggField) + "_" + aggregateFunction + "_" + basicLevel.name val (withBasicLevelAggDf, intermediateBasicCols) = applyAggregationAtBasicLevel(df, keyColumnExprAndAlias, featureDefAggField, @@ -336,19 +363,11 @@ class MultiLevelAggregationTransform(ss: SparkSession, */ .withColumn( itemsInEveryFullBucketAtLowLevelOneHot, - if (debug) { - when( - // isTheFirstRecordInRounded5Minutes - isnull(col(previousRoundedBasicBucketInMin)) || col(roundedLowLevelBucketTimestampColumn) =!= col(previousRoundedBasicBucketInMin), - col(itemsForEveryFullBucketAtLowLevel) - ).otherwise(expr("array()")) - } else { when( // isTheFirstRecordInRounded5Minutes isnull(col(previousRoundedBasicBucketInMin)) || col(roundedLowLevelBucketTimestampColumn) =!= col(previousRoundedBasicBucketInMin), coalesce(col(featureValueForEveryFullBucketAtLowLevel), lit(0)) ).otherwise(lit(0)) - } ) val windowSpec1Hour = Window.partitionBy(keyColumns.head, keyColumns.tail: _*) // TODO should sort by utcTimestampColumnName? @@ -365,37 +384,11 @@ class MultiLevelAggregationTransform(ss: SparkSession, apply sum on itemsInEveryFullBucketAtLowLevelOneHot. */ val withAllPreviousLowLevelBucketsDf = withPreviousAggedDataDf.withColumn(aggregatedDataFromAllPreviousLowLevelBuckets, - if (debug) { - collect_list(itemsInEveryFullBucketAtLowLevelOneHot).over(windowSpec1Hour) - } else { coalesce(sum(itemsInEveryFullBucketAtLowLevelOneHot).over(windowSpec1Hour), lit(0)) - } ) - // TODO fix the datatype and remove try - val withOutputFeatureDf = - if (debug) { - try ( - withAllPreviousLowLevelBucketsDf - .withColumn(aggregateDataAtHighLevel, - array(array(col(itemsAtCurrentBucketAtLowLevelSoFar)), col(aggregatedDataFromAllPreviousLowLevelBuckets))) - ) catch { - case e => - try ( - withAllPreviousLowLevelBucketsDf - .withColumn(aggregateDataAtHighLevel, - array(array(array(array(col(itemsAtCurrentBucketAtLowLevelSoFar)))), (col(aggregatedDataFromAllPreviousLowLevelBuckets)))) - ) catch { - case e => - withAllPreviousLowLevelBucketsDf - .withColumn(aggregateDataAtHighLevel, - array(array(array(array(array(col(itemsAtCurrentBucketAtLowLevelSoFar))))), (col(aggregatedDataFromAllPreviousLowLevelBuckets)))) - } - } - } else { - withAllPreviousLowLevelBucketsDf + val withOutputFeatureDf = withAllPreviousLowLevelBucketsDf .withColumn(aggregateDataAtHighLevel, col(featureValueAtCurrentBucketAtLowLevelSoFar) + col(aggregatedDataFromAllPreviousLowLevelBuckets)) - } // Create these output columns for next level roll up val itemsForEveryFullBucketAtHighLevel = "itemsForEveryFullBucketAtHighLevel_" + highLevelName @@ -417,14 +410,9 @@ class MultiLevelAggregationTransform(ss: SparkSession, ) val withHighLevelOutputColumnDf = withRoundedHighLevelBucketTimestampColumn.withColumn(itemsForEveryFullBucketAtHighLevel, - if(debug) { - collect_set(itemsInEveryFullBucketAtLowLevelOneHot).over(windowSpecHighLevel) - } else { sum(itemsInEveryFullBucketAtLowLevelOneHot).over(windowSpecHighLevel) - } ) - val highLevelBucketIntervalInMinutes: Int = highLevelName.durationInSecond / 60 val windowSpecAtHighLevelSoFar = Window.partitionBy(roundedHighLevelBucketTimestampColumn, keyColumns: _*) .orderBy(col(roundedLowLevelBucketTimestampColumn)) .rangeBetween( @@ -434,23 +422,12 @@ class MultiLevelAggregationTransform(ss: SparkSession, // e.g. if 5m -> 1h, this means all the full 5m within the current hour, it might be 0 - 11. val aggregatedDataFromAllPreviousLowLevelFullBuckets = "aggregatedDataFromAllPreviousLowLevelFullBuckets" + nameSuffix val withFullBucketLowLevelDf = withHighLevelOutputColumnDf.withColumn(aggregatedDataFromAllPreviousLowLevelFullBuckets, - if (debug) { - collect_set(itemsInEveryFullBucketAtLowLevelOneHot).over(windowSpecAtHighLevelSoFar) - } else { coalesce(sum(itemsInEveryFullBucketAtLowLevelOneHot).over(windowSpecAtHighLevelSoFar), lit(0)) - } ) - val withCurrentHighLevelBucketDf = if (debug) { - withFullBucketLowLevelDf.withColumn(itemsAtCurrentBucketAtHighSoFar, - concat(col(aggregatedDataFromAllPreviousLowLevelFullBuckets).cast("string"), - col(itemsAtCurrentBucketAtLowLevelSoFar).cast("string")) - ) - } else { + val withCurrentHighLevelBucketDf = withFullBucketLowLevelDf.withColumn(itemsAtCurrentBucketAtHighSoFar, col(aggregatedDataFromAllPreviousLowLevelFullBuckets) + - col(featureValueAtCurrentBucketAtLowLevelSoFar) - ) - } + col(featureValueAtCurrentBucketAtLowLevelSoFar)) val intermediateOutputCols = Seq(previousRoundedBasicBucketInMin, itemsInEveryFullBucketAtLowLevelOneHot, aggregatedDataFromAllPreviousLowLevelBuckets, timeColumnName, roundedHighLevelBucketTimestampColumn, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index d5812b982..95c6a5fe3 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -53,7 +53,8 @@ private[feathr] object FeathrUtils { val SANITY_CHECK_MODE_ROW_COUNT = "sanity.check.row.count" val FILTER_NULLS = "filter.nulls" val STRING_PARAMETER_DELIMITER = "," - + val EXPAND_DAYS_IN_FEATURE_GENERATION_CUTOFF_TIME = "expand.days.in.feature.generation.cutoff.time" + val DROP_DUPLICATE_ROWS_FOR_KEYS_IN_FEATURE_GENERATION = "drop.duplicate.rows.in.feature.generation" // Used to check if the current dataframe has satisfied the checkpoint frequency val checkPointSequenceNumber = new AtomicLong(0) @@ -88,7 +89,9 @@ private[feathr] object FeathrUtils { SPARK_JOIN_MIN_PARALLELISM -> (SQLConf.buildConf(getFullConfigKeyName(SPARK_JOIN_MIN_PARALLELISM )).stringConf.createOptional, "10"), ENABLE_SANITY_CHECK_MODE -> (SQLConf.buildConf(getFullConfigKeyName(ENABLE_SANITY_CHECK_MODE )).stringConf.createOptional, "false"), SANITY_CHECK_MODE_ROW_COUNT -> (SQLConf.buildConf(getFullConfigKeyName(SANITY_CHECK_MODE_ROW_COUNT )).stringConf.createOptional, "10"), - FILTER_NULLS -> (SQLConf.buildConf(getFullConfigKeyName(FILTER_NULLS )).stringConf.createOptional, "false") + FILTER_NULLS -> (SQLConf.buildConf(getFullConfigKeyName(FILTER_NULLS )).stringConf.createOptional, "false"), + EXPAND_DAYS_IN_FEATURE_GENERATION_CUTOFF_TIME -> (SQLConf.buildConf(getFullConfigKeyName(EXPAND_DAYS_IN_FEATURE_GENERATION_CUTOFF_TIME )).stringConf.createOptional, "0"), + DROP_DUPLICATE_ROWS_FOR_KEYS_IN_FEATURE_GENERATION -> (SQLConf.buildConf(getFullConfigKeyName(DROP_DUPLICATE_ROWS_FOR_KEYS_IN_FEATURE_GENERATION )).stringConf.createOptional, "true") ) /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/swj/aggregate/AggregationType.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/swj/aggregate/AggregationType.scala index 8e682e75e..16483015b 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/swj/aggregate/AggregationType.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/swj/aggregate/AggregationType.scala @@ -2,5 +2,6 @@ package com.linkedin.feathr.swj.aggregate object AggregationType extends Enumeration { type AggregationType = Value - val SUM, COUNT, COUNT_DISTINCT, AVG, MAX, TIMESINCE, LATEST, DUMMY, MIN, MAX_POOLING, MIN_POOLING, AVG_POOLING, SUM_POOLING, BUCKETED_COUNT_DISTINCT = Value + val SUM, COUNT, COUNT_DISTINCT, AVG, MAX, TIMESINCE, LATEST, DUMMY, MIN, MAX_POOLING, MIN_POOLING, AVG_POOLING, SUM_POOLING, + BUCKETED_COUNT_DISTINCT, BUCKETED_SUM = Value } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala index a0574f313..2002903f1 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala @@ -1593,14 +1593,19 @@ class FeatureGenIntegTest extends FeathrIntegTest { | g: { | def: count // the column that contains the raw view count | aggregation: BUCKETED_COUNT_DISTINCT - | window: "1d" + | window: "1y" + | } + | h: { + | def: count // the column that contains the raw view count + | aggregation: BUCKETED_SUM + | window: "1y" | } | } | } |} """.stripMargin - val features = Seq("g") + val features = Seq("g", "h") val keyField = "key0" val featureGenConfigStr = s""" diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index f06e5ca06..0667b340d 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -1,13 +1,9 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.offline.AssertFeatureUtils.{rowApproxEquals, validateRows} -import com.linkedin.feathr.offline.client.FeathrClient -import com.linkedin.feathr.offline.config.FeatureJoinConfig -import com.linkedin.feathr.offline.job.FeatureJoinJob -import com.linkedin.feathr.offline.job.LocalFeatureJoinJob.loadObservationAsFDS import com.linkedin.feathr.offline.transformation.MultiLevelAggregationTransform +import com.linkedin.feathr.offline.util.FeathrUtils import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, SKIP_MISSING_FEATURE, setFeathrJobParam} -import com.linkedin.feathr.offline.util.{FeathrUtils, SuppressedExceptionHandlerUtils} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -88,10 +84,16 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { } @Test - def testSWABucketedDistinctCount: Unit = { + def testSWABucketedAggregations: Unit = { val df = getDf() - val aggFunction = "BUCKETED_COUNT_DISTINCT" - val featureDefAggField = "value" + val aggFunctionAndField = Seq(("BUCKETED_COUNT_DISTINCT", "value"), ("BUCKETED_SUM", "length(value)")) + aggFunctionAndField.map { case (func, field) => { + testSWABucketedHelper(df, func, field) + } + } + } + + def testSWABucketedHelper(df: DataFrame, aggFunction: String, featureDefAggField: String): Unit = { bucketedDistinctCount(df, "5m", featureDefAggField, aggFunction) bucketedDistinctCount(df, "1h", featureDefAggField, aggFunction) bucketedDistinctCount(df, "1d", featureDefAggField, aggFunction) diff --git a/gradle.properties b/gradle.properties index bd4e6fb38..936110679 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.4-rc3 +version=1.0.4-rc4 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12