Skip to content

Commit

Permalink
Add default column for missing features (#1158)
Browse files Browse the repository at this point in the history
* Add default column for missing features

* Fix failing test

* Fix SWA sparksession issue

* address comments

* Add comment

* bump version

---------

Co-authored-by: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
Co-authored-by: Jinghui Mo <jmo@linkedin.com>
  • Loading branch information
3 people committed May 5, 2023
1 parent f0cb5d2 commit c239d22
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.feathr.offline.client

import com.linkedin.feathr.common.exception._
import com.linkedin.feathr.common.{FeatureInfo, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName}
import com.linkedin.feathr.common.{FeatureInfo, FeatureTypeConfig, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName}
import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater
import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig}
import com.linkedin.feathr.offline.generation.{DataFrameFeatureGenerator, FeatureGenKeyTagAnalyzer, StreamingFeatureGenerator}
Expand All @@ -12,8 +12,10 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.DataSource
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.swa.SWAHandler
import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults
import com.linkedin.feathr.offline.util._
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand Down Expand Up @@ -310,6 +312,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:

var logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures)
val shouldSkipFeature = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean
val shouldAddDefault = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
var leftRenamed = left
val featureToPathsMap = (for {
requiredFeature <- logicalPlan.allRequiredFeatures
featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName)
Expand All @@ -323,6 +327,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater()
.getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2)
logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures)
} else if (shouldAddDefault) {
// dont throw error if this flag is set, the missing data will be handled at a later step.
} else {
throw new FeathrInputDataException(
ErrorLabel.FEATHR_USER_ERROR,
Expand Down Expand Up @@ -358,7 +364,6 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
val renameFeatures = conflictsAutoCorrectionSetting.get.renameFeatureList
val suffix = conflictsAutoCorrectionSetting.get.suffix
log.warn(s"Found conflicted field names: ${conflictFeatureNames}. Will auto correct them by applying suffix: ${suffix}")
var leftRenamed = left
conflictFeatureNames.foreach(name => {
leftRenamed = leftRenamed.withColumnRenamed(name, name+'_'+suffix)
})
Expand All @@ -369,7 +374,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
s"Failed to apply auto correction to solve conflicts. Still got conflicts after applying provided suffix ${suffix} for fields: " +
s"${conflictFeatureNames}. Please provide another suffix or solve conflicts manually.")
}
val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed, rowBloomFilterThreshold)
val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed,
rowBloomFilterThreshold)
if(renameFeatures) {
log.warn(s"Suffix :${suffix} is applied into feature names: ${conflictFeatureNames} to avoid conflicts in outputs")
renameFeatureNames(df, header, conflictFeatureNames, suffix)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.feathr.offline.join.workflow

import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrFeatureJoinException}
import com.linkedin.feathr.common.{ErasedEntityTaggedFeature, FeatureTypeConfig}
import com.linkedin.feathr.common.{ErasedEntityTaggedFeature, FeatureTypeConfig, FeatureTypes}
import com.linkedin.feathr.offline
import com.linkedin.feathr.offline.FeatureDataFrame
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
Expand All @@ -12,15 +12,16 @@ import com.linkedin.feathr.offline.job.KeyedTransformedResult
import com.linkedin.feathr.offline.join._
import com.linkedin.feathr.offline.join.algorithms._
import com.linkedin.feathr.offline.join.util.FrequentItemEstimatorFactory
import com.linkedin.feathr.offline.logical.{LogicalPlan, MultiStageJoinPlan}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor
import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults
import com.linkedin.feathr.offline.transformation.DataFrameExt._
import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils}
import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils}
import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{col, lit}

/**
* An abstract class provides default implementation of anchored feature join step
Expand All @@ -39,8 +40,64 @@ private[offline] class AnchoredFeatureJoinStep(
extends FeatureJoinStep[AnchorJoinStepInput, DataFrameJoinStepOutput] {
@transient lazy val log = LogManager.getLogger(getClass.getName)

/**
* When the add.default.col.for.missing.data flag is turned, some features could be skipped because of missing data.
* For such anchored features, we will add a feature column with a configured default value (if present in the feature anchor) or
* a null value column.
* @param sparkSession spark session
* @param dataframe the original observation dataframe
* @param logicalPlan logical plan generated using the join config
* @param missingFeatures Map of missing feature names to the corresponding featureAnchorWithSource object.
* @return Dataframe with the missing feature columns added
*/
def substituteDefaultsForDataMissingFeatures(sparkSession: SparkSession, dataframe: DataFrame, logicalPlan: MultiStageJoinPlan,
missingFeatures: Map[String, FeatureAnchorWithSource]): DataFrame = {
// Create a map of feature name to corresponding defaults. If a feature does not have default value, it would be missing
// from this map and we would add a default column of nulls for those features.
val defaults = missingFeatures.flatMap(s => s._2.featureAnchor.defaults)

// Create a map of feature to their feature type if configured.
val featureTypes = missingFeatures
.map(x => Some(x._2.featureAnchor.featureTypeConfigs))
.foldLeft(Map.empty[String, FeatureTypeConfig])((a, b) => a ++ b.getOrElse(Map.empty[String, FeatureTypeConfig]))

// We try to guess the column data type from the configured feature type. If feature type is not present, we will default to
// default feathr behavior of returning a map column of string to float.
val obsDfWithDefaultNullColumn = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) =>
val featureColumnType = if (featureTypes.contains(featureName)) {
featureTypes(featureName).getFeatureType match {
case FeatureTypes.NUMERIC => "float"
case FeatureTypes.BOOLEAN => "boolean"
case FeatureTypes.DENSE_VECTOR => "array<float>"
case FeatureTypes.CATEGORICAL => "string"
case FeatureTypes.CATEGORICAL_SET => "array<string>"
case FeatureTypes.TERM_VECTOR => "map<string,float>"
case FeatureTypes.UNSPECIFIED => "map<string,float>"
case _ => "map<string,float>"
}
} else { // feature type is not configured
"map<string,float>"
}
observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType))
}

val dataframeWithDefaults = substituteDefaults(obsDfWithDefaultNullColumn, missingFeatures.keys.toSeq, defaults, featureTypes,
sparkSession, (s: String) => s"${FEATURE_NAME_PREFIX}$s")

// We want to duplicate this column with the correct feathr supported feature name which is required for further processing.
// For example, if feature name is abc and the corresponding key is x, the column name would be __feathr_feature_abc_x.
// This column will be dropped after all the joins are done.
missingFeatures.keys.foldLeft(dataframeWithDefaults) { (dataframeWithDefaults, featureName) =>
val keyTags = logicalPlan.joinStages.filter(kv => kv._2.contains(featureName)).head._1
val keyStr = keyTags.map(logicalPlan.keyTagIntsToStrings).toList
dataframeWithDefaults.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName, Some(keyStr)),
col(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName)))
}
}

/**
* Join anchored features to the observation passed as part of the input context.
*
* @param features Non-window aggregation, basic anchored features.
* @param input input context for this step.
* @param ctx environment variable that contains join job execution context.
Expand All @@ -49,10 +106,22 @@ private[offline] class AnchoredFeatureJoinStep(
override def joinFeatures(features: Seq[ErasedEntityTaggedFeature], input: AnchorJoinStepInput)(
implicit ctx: JoinExecutionContext): FeatureDataFrameOutput = {
val AnchorJoinStepInput(observationDF, anchorDFMap) = input
val shouldAddDefault = FeathrUtils.getFeathrJobParam(ctx.sparkSession.sparkContext.getConf,
FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
val withMissingFeaturesSubstituted = if (shouldAddDefault) {
val missingFeatures = features.map(x => x.getFeatureName).filter(x => {
val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq
containsFeature.contains(false)
})
val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1))
substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
missingAnchoredFeatures)
}else observationDF

val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures
val joinStages = ctx.logicalPlan.joinStages
val joinOutput = joinStages
.foldLeft(FeatureDataFrame(observationDF, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => {
.foldLeft(FeatureDataFrame(withMissingFeaturesSubstituted, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => {
val (keyTags: Seq[Int], featureNames: Seq[String]) = joinStage
val FeatureDataFrame(contextDF, inferredFeatureTypeMap) = accFeatureDataFrame
// map feature name to its transformed dataframe and the join key of the dataframe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
requiredFeatureAnchors: Seq[FeatureAnchorWithSource],
failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = {
val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean
val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean

// get a Map from each source to a list of all anchors based on this source
val sourceToAnchor = requiredFeatureAnchors
.map(anchor => (anchor.source, anchor))
Expand Down Expand Up @@ -74,15 +76,15 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH

// If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the
// feature should be skipped. So, we will try to take the first row here, and see if it succeeds.
if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && shouldSkipFeature) {
if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || shouldAddDefaultCol)) {
if (dataSource.get().take(1).isEmpty) None else {
Some(dataSource)
}
} else {
Some(dataSource)
}
} catch {
case e: Exception => if (shouldSkipFeature) None else throw e
case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e
}

anchorsWithDate.map(anchor => (anchor, timeSeriesSource))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager
import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader}
import com.linkedin.feathr.offline.util.FeathrTestUtils
import com.linkedin.feathr.offline.util.FeathrUtils.{SKIP_MISSING_FEATURE, setFeathrJobParam}
import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam}
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.testng.Assert.assertTrue
import org.testng.Assert.{assertEquals, assertTrue}
import org.testng.annotations.{BeforeClass, Test}

import scala.collection.mutable
Expand Down Expand Up @@ -386,6 +386,153 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
setFeathrJobParam(SKIP_MISSING_FEATURE, "false")
}

/*
* Test skipping combination of anchored, derived and swa features. Also, test it with different default value types.
*/
@Test
def testAddDefaultForMissingAnchoredFeatures: Unit = {
setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "true")
val df = runLocalFeatureJoinForTest(
joinConfigAsString =
"""
|settings: {
| joinTimeSettings: {
| timestampColumn: {
| def: "timestamp"
| format: "yyyy-MM-dd"
| }
| simulateTimeDelay: 1d
| }
|}
|
| features: {
| key: a_id
| featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4",
| "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", "featureWithNull7", "derived_featureWithNull7"
| "aEmbedding", "memberEmbeddingAutoTZ"]
| }
""".stripMargin,
featureDefAsString =
"""
| sources: {
| swaSource: {
| location: { path: "generaion/daily" }
| timePartitionPattern: "yyyy/MM/dd"
| timeWindowParameters: {
| timestampColumn: "timestamp"
| timestampColumnFormat: "yyyy-MM-dd"
| }
| }
| swaSource1: {
| location: { path: "generation/daily" }
| timePartitionPattern: "yyyy/MM/dd"
| timeWindowParameters: {
| timestampColumn: "timestamp"
| timestampColumnFormat: "yyyy-MM-dd"
| }
| }
|}
|
| anchors: {
| anchor1: {
| source: "anchorAndDerivations/nullVaueSource.avro.json"
| key: "toUpperCaseExt(mId)"
| features: {
| featureWithNull: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| type: NUMERIC
| default: -1
| }
| featureWithNull3: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| type: CATEGORICAL
| default: "null"
| }
| featureWithNull7: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| }
| featureWithNull4: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| type: TERM_VECTOR
| default: {}
| }
| featureWithNull6: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| type: DENSE_VECTOR
| default: [1, 2, 3]
| }
| featureWithNull5: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| default: 1
| }
| }
| }
|
| anchor2: {
| source: "anchorAndDerivations/nullValueSource.avro.json"
| key: "toUpperCaseExt(mId)"
| features: {
| featureWithNull2: "isPresent(value) ? toNumeric(value) : 0"
| }
| }
| swaAnchor: {
| source: "swaSource"
| key: "x"
| features: {
| aEmbedding: {
| def: "embedding"
| aggregation: LATEST
| window: 3d
| default: 2
| }
| }
| }
| swaAnchor1: {
| source: "swaSource1"
| key: "x"
| features: {
| memberEmbeddingAutoTZ: {
| def: "embedding"
| aggregation: LATEST
| window: 3d
| type: {
| type: TENSOR
| tensorCategory: SPARSE
| dimensionType: [INT]
| valType: FLOAT
| }
| }
| }
| }
|}
|derivations: {
|
| derived_featureWithNull: "featureWithNull * 2"
| derived_featureWithNull2: "featureWithNull2 * 2"
| derived_featureWithNull7: "featureWithNull7 * 2"
|}
""".stripMargin,
observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv")

df.data.show()
val featureList = df.data.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("a_id") else "null")
assertEquals(featureList(0).getAs[Row]("aEmbedding"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNull3"), "null")
assertEquals(featureList(0).getAs[Row]("featureWithNull5"), mutable.Map("" -> 1.0f))
assertEquals(featureList(0).getAs[Row]("featureWithNull7"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull"),-1.0f)
assertEquals(featureList(0).getAs[Row]("featureWithNull4"),Map())
assertEquals(featureList(0).getAs[Row]("featureWithNull2"),1.0f)
assertEquals(featureList(0).getAs[Row]("derived_featureWithNull"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(-2.0f))))
assertEquals(featureList(0).getAs[Row]("derived_featureWithNull7"),
Row(mutable.WrappedArray.make(Array()), mutable.WrappedArray.empty))
assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f))))
setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false")
}

/*
* Test features with fdsExtract.
*/
Expand Down
Loading

0 comments on commit c239d22

Please sign in to comment.