Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Rakesh Kashyap Hanasoge Padmanabha committed May 16, 2023
1 parent c860208 commit 559fbca
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
def joinFeaturesWithSuppressedExceptions(joinConfig: FeatureJoinConfig, obsData: SparkFeaturizedDataset,
jobContext: JoinJobContext = JoinJobContext()): (SparkFeaturizedDataset, Map[String, String]) = {
(joinFeatures(joinConfig, obsData, jobContext), Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION
-> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs))
-> SuppressedExceptionHandlerUtils.missingFeatures.mkString))
}

/**
Expand Down Expand Up @@ -231,7 +231,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:

val (joinedDF, header) = doJoinObsAndFeatures(joinConfig, jobContext, obsData)
(joinedDF, header, Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION
-> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs))
-> SuppressedExceptionHandlerUtils.missingFeatures.mkString))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.linkedin.feathr.offline.join.algorithms.{SequentialJoinConditionBuild
import com.linkedin.feathr.offline.logical.FeatureGroups
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils
import com.linkedin.feathr.offline.util.{FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils}
import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame}
import com.linkedin.feathr.sparkcommon.FeatureDerivationFunctionSpark
import com.linkedin.feathr.{common, offline}
Expand All @@ -37,6 +37,9 @@ private[offline] class DerivedFeatureEvaluator(derivationStrategies: DerivationS
def evaluate(keyTag: Seq[Int], keyTagList: Seq[String], contextDF: DataFrame, derivedFeature: DerivedFeature): FeatureDataFrame = {
val tags = Some(keyTag.map(keyTagList).toList)
val producedFeatureColName = DataFrameColName.genFeatureColumnName(derivedFeature.producedFeatureNames.head, tags)
if (derivedFeature.consumedFeatureNames.exists(x => SuppressedExceptionHandlerUtils.missingFeatures.contains(x.getFeatureName))) {
SuppressedExceptionHandlerUtils.missingFeatures.add(derivedFeature.producedFeatureNames.head)
}

derivedFeature.derivation match {
case g: SeqJoinDerivationFunction =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession,
col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName)))
val missingFeature = derivedFeature.producedFeatureNames.head
log.warn(s"Missing data for features ${missingFeature}. Default values will be populated for this column.")
SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeature
SuppressedExceptionHandlerUtils.missingFeatures += missingFeature
return seqJoinFeatureResultWithRenamed
}
val aggregationFunction = seqJoinDerivationFunction.aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ private[offline] class AnchoredFeatureJoinStep(
!containsFeature.contains(true)
})
log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.")
SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += missingFeatures.mkString
SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures
val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1))
substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private[offline] class SlidingWindowAggregationJoiner(
res.map(emptyFeatures.add)
val exceptionMsg = emptyFeatures.mkString
log.warn(s"Missing data for features ${emptyFeatures}. Default values will be populated for this column.")
SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += exceptionMsg
SuppressedExceptionHandlerUtils.missingFeatures ++= emptyFeatures
anchors.map(anchor => (anchor, originalSourceDf))
} else {
val sourceDF: DataFrame = preprocessedDf match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
*/
object SuppressedExceptionHandlerUtils {
val MISSING_DATA_EXCEPTION = "missing_data_exception"
var missingDataSuppressedExceptionMsgs = ""

// Set of features that may be missing because of missing data.
var missingFeatures = scala.collection.mutable.Set.empty[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.linkedin.feathr.offline.config.location.SimplePath
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager
import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader}
import com.linkedin.feathr.offline.util.FeathrTestUtils
import com.linkedin.feathr.offline.util.{FeathrTestUtils, SuppressedExceptionHandlerUtils}
import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam}
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -560,6 +560,9 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f)
assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f))))
assertEquals(SuppressedExceptionHandlerUtils.missingFeatures,
Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7",
"aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull"))
setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false")
}

Expand Down

0 comments on commit 559fbca

Please sign in to comment.